Skip to content

Commit

Permalink
Add support for client-initiated connection settings request
Browse files Browse the repository at this point in the history
- Implemented spec change open-telemetry/opamp-spec#162
- Added ability for the client to request connection settings.
- Added CSR flow example to demonstrate the new capability.
  • Loading branch information
tigrannajaryan committed Oct 18, 2023
1 parent c1931d7 commit 4515106
Show file tree
Hide file tree
Showing 9 changed files with 577 additions and 135 deletions.
9 changes: 9 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,13 @@ type OpAMPClient interface {
// May be called anytime after Start(), including from OnMessage handler.
// nil values are not allowed and will return an error.
SetPackageStatuses(statuses *protobufs.PackageStatuses) error

// RequestConnectionSettings sets a ConnectionSettingsRequest. The ConnectionSettingsRequest
// will be included in the next AgentToServer message sent to the Server.
// Used for client-initiated connection setting acquisition flows.
// It is the responsibility of the caller to ensure that the Server supports
// AcceptsConnectionSettingsRequest capability.
// May be called before or after Start().
// May be also called from OnMessage handler.
RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error
}
307 changes: 187 additions & 120 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,150 +682,217 @@ func TestAgentIdentification(t *testing.T) {
})
}

func TestConnectionSettings(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
hash := []byte{1, 2, 3}
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}
metricsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://metrics.com"}
tracesSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://traces.com"}
logsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://logs.com"}
otherSettings := &protobufs.OtherConnectionSettings{DestinationEndpoint: "http://other.com"}

var rcvStatus int64
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg != nil {
atomic.AddInt64(&rcvStatus, 1)

return &protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Hash: hash,
Opamp: opampSettings,
OwnMetrics: metricsSettings,
OwnTraces: tracesSettings,
OwnLogs: logsSettings,
OtherConnections: map[string]*protobufs.OtherConnectionSettings{
"other": otherSettings,
func TestServerOfferConnectionSettings(t *testing.T) {
testClients(
t, func(t *testing.T, client OpAMPClient) {
hash := []byte{1, 2, 3}
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}
metricsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://metrics.com"}
tracesSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://traces.com"}
logsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://logs.com"}
otherSettings := &protobufs.OtherConnectionSettings{DestinationEndpoint: "http://other.com"}

var rcvStatus int64
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg != nil {
atomic.AddInt64(&rcvStatus, 1)

return &protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Hash: hash,
Opamp: opampSettings,
OwnMetrics: metricsSettings,
OwnTraces: tracesSettings,
OwnLogs: logsSettings,
OtherConnections: map[string]*protobufs.OtherConnectionSettings{
"other": otherSettings,
},
},
},
}
}
return nil
}
return nil
}

var gotOpampSettings int64
var gotOwnSettings int64
var gotOtherSettings int64

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
assert.True(t, proto.Equal(metricsSettings, msg.OwnMetricsConnSettings))
assert.True(t, proto.Equal(tracesSettings, msg.OwnTracesConnSettings))
assert.True(t, proto.Equal(logsSettings, msg.OwnLogsConnSettings))
atomic.AddInt64(&gotOwnSettings, 1)

assert.Len(t, msg.OtherConnSettings, 1)
assert.True(t, proto.Equal(otherSettings, msg.OtherConnSettings["other"]))
atomic.AddInt64(&gotOtherSettings, 1)
},
var gotOpampSettings int64
var gotOwnSettings int64
var gotOtherSettings int64

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
assert.True(t, proto.Equal(metricsSettings, msg.OwnMetricsConnSettings))
assert.True(t, proto.Equal(tracesSettings, msg.OwnTracesConnSettings))
assert.True(t, proto.Equal(logsSettings, msg.OwnLogsConnSettings))
atomic.AddInt64(&gotOwnSettings, 1)

assert.Len(t, msg.OtherConnSettings, 1)
assert.True(t, proto.Equal(otherSettings, msg.OtherConnSettings["other"]))
atomic.AddInt64(&gotOtherSettings, 1)
},

OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
atomic.AddInt64(&gotOpampSettings, 1)
return nil
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
atomic.AddInt64(&gotOpampSettings, 1)
return nil
},
},
},
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnTraces |
protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics |
protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnLogs |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOtherConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnTraces |
protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics |
protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnLogs |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOtherConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))
assert.NoError(t, client.Start(context.Background(), settings))

eventually(t, func() bool { return atomic.LoadInt64(&gotOpampSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOwnSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOtherSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&rcvStatus) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOpampSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOwnSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOtherSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&rcvStatus) == 1 })

// Shutdown the Server.
srv.Close()
// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
},
)
}

func TestReportAgentDescription(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
func TestClientRequestConnectionSettings(t *testing.T) {
testClients(
t, func(t *testing.T, client OpAMPClient) {
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()
var srvReceivedRequest int64
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg != nil && msg.ConnectionSettingsRequest != nil {
atomic.AddInt64(&srvReceivedRequest, 1)
return &protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: opampSettings,
},
}
}
return nil
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig,
}
prepareClient(t, &settings, client)
var clientGotOpampSettings int64

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
atomic.AddInt64(&clientGotOpampSettings, 1)
return nil
},
},
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full AgentDescription.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})
client.RequestConnectionSettings(&protobufs.ConnectionSettingsRequest{})

// Client --->
// Trigger a status report.
_ = client.UpdateEffectiveConfig(context.Background())
// Wait until server receives the request.
eventually(t, func() bool { return atomic.LoadInt64(&srvReceivedRequest) == 1 })

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed AgentDescription.
assert.Nil(t, msg.AgentDescription)
// Wait until client receives the server's response.
eventually(t, func() bool { return atomic.LoadInt64(&clientGotOpampSettings) == 1 })

assert.EqualValues(t, 1, msg.SequenceNum)
// Shutdown the Server.
srv.Close()

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState),
// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
},
)
}

func TestReportAgentDescription(t *testing.T) {
testClients(
t, func(t *testing.T, client OpAMPClient) {
// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig,
}
})
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(
func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full AgentDescription.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
},
)

// Server has requested the client to report, so there will be another message
// coming to the Server.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full AgentDescription
// because the Server asked for it.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})
// Client --->
// Trigger a status report.
_ = client.UpdateEffectiveConfig(context.Background())

// Shutdown the Server.
srv.Close()
// ---> Server
srv.Expect(
func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed AgentDescription.
assert.Nil(t, msg.AgentDescription)

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
assert.EqualValues(t, 1, msg.SequenceNum)

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState),
}
},
)

// Server has requested the client to report, so there will be another message
// coming to the Server.
// ---> Server
srv.Expect(
func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full AgentDescription
// because the Server asked for it.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
},
)
}

func TestReportAgentHealth(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (c *httpClient) SetAgentDescription(descr *protobufs.AgentDescription) erro
return c.common.SetAgentDescription(descr)
}

func (c *httpClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error {
return c.common.RequestConnectionSettings(request)
}

// SetHealth implements OpAMPClient.SetHealth.
func (c *httpClient) SetHealth(health *protobufs.ComponentHealth) error {
return c.common.SetHealth(health)
Expand Down
10 changes: 10 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return nil
}

func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error {
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.ConnectionSettingsRequest = request
},
)
c.sender.ScheduleSend()
return nil
}

// SetHealth sends a status update to the Server with the new agent health
// and remembers the health in the client state so that it can be sent
// to the Server when the Server asks for it.
Expand Down
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error
return c.common.SetAgentDescription(descr)
}

func (c *wsClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error {
return c.common.RequestConnectionSettings(request)
}

func (c *wsClient) SetHealth(health *protobufs.ComponentHealth) error {
return c.common.SetHealth(health)
}
Expand Down
Loading

0 comments on commit 4515106

Please sign in to comment.