Skip to content

Commit

Permalink
Implement plain HTTP Transport in OpAMPServer
Browse files Browse the repository at this point in the history
This adds plain HTTP request support to OpAMPServer implementation.
Implements spec change: open-telemetry/opamp-spec#70
  • Loading branch information
tigrannajaryan committed Apr 7, 2022
1 parent 405adf8 commit 07090b0
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 11 deletions.
8 changes: 8 additions & 0 deletions server/callbacks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"net/http"

"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -32,6 +33,13 @@ func (c CallbacksStruct) OnConnected(conn types.Connection) {
func (c CallbacksStruct) OnMessage(conn types.Connection, message *protobufs.AgentToServer) {
if c.OnMessageFunc != nil {
c.OnMessageFunc(conn, message)
} else {
// From spec https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
// "The Server MUST respond to the status report by sending a ServerToAgent message."
// We will send an empty response since there is no user-defined callback to handle it.
conn.Send(context.Background(), &protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
})
}
}

Expand Down
70 changes: 70 additions & 0 deletions server/httpconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package server

import (
"context"
"errors"
"net"
"net/http"

"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
)

var errInvalidHTTPConnection = errors.New(
"httpConnection is not valid (are you trying to Send() more than one message?)",
)

// httpConnection represents a logical OpAMP connection over a plain HTTP connection.
// Note: we are distinguishing the OpAMP connection from HTTP connection
// here. Only one response is possible to send when using plain HTTP connection.
// The HTTP connection that was used for this OpAMP connection may remain open if the
// client uses Keep-Alive option and that same HTTP connection may be reused for a future
// OpAMP connection.
// From OpAMP perspective the connection represented by httpConnection instance is
// used to receive exactly one AgentToServer message and to send exactly one
// ServerToAgent message. After sending the ServerToAgent message httpConnection
// instance is no longer valid.
type httpConnection struct {
httpResponseWriter http.ResponseWriter
callbacks types.Callbacks
conn net.Conn
}

func (c httpConnection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}

var _ types.Connection = (*httpConnection)(nil)

func (c httpConnection) Send(_ context.Context, message *protobufs.ServerToAgent) error {
if c.httpResponseWriter == nil {
return errInvalidHTTPConnection
}

defer func() {
if c.callbacks != nil {
// Indicate via the callback that the OpAMP Connection is closed. From OpAMP
// perspective the connection represented by this httpConnection instance
// is closed. It is not possible to send or receive more OpAMP messages
// via this httpConnection.
c.callbacks.OnConnectionClose(c)
}
}()

bytes, err := proto.Marshal(message)
if err != nil {
c.httpResponseWriter.WriteHeader(http.StatusInternalServerError)
return err
}

c.httpResponseWriter.Header().Set(headerContentType, contentTypeProtobuf)

_, err = c.httpResponseWriter.Write(bytes)

// Mark this connection as no longer valid.
c.httpResponseWriter = nil

return err
}
23 changes: 23 additions & 0 deletions server/httpconnectioncontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package server

import (
"context"
"net"
"net/http"
)

type connContextKeyType struct {
key string
}

var connContextKey = connContextKeyType{key: "http-conn"}

func contextWithConn(ctx context.Context, c net.Conn) context.Context {
// Create a new context that stores the net.Conn. For use as ConnContext func
// of http.Server to remember the connection in the context.
return context.WithValue(ctx, connContextKey, c)
}
func connFromRequest(r *http.Request) net.Conn {
// Extract the net.Conn from the context of the specified http.Request.
return r.Context().Value(connContextKey).(net.Conn)
}
67 changes: 61 additions & 6 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"errors"
"io"
"net"
"net/http"

Expand All @@ -19,6 +20,8 @@ var (
)

const defaultOpAMPPath = "/v1/opamp"
const headerContentType = "Content-Type"
const contentTypeProtobuf = "application/x-protobuf"

type server struct {
logger types.Logger
Expand Down Expand Up @@ -69,9 +72,10 @@ func (s *server) Start(settings StartSettings) error {
mux.HandleFunc(path, s.httpHandler)

hs := &http.Server{
Handler: mux,
Addr: settings.ListenEndpoint,
TLSConfig: settings.TLSConfig,
Handler: mux,
Addr: settings.ListenEndpoint,
TLSConfig: settings.TLSConfig,
ConnContext: contextWithConn,
}
s.httpServer = hs

Expand Down Expand Up @@ -143,7 +147,15 @@ func (s *server) httpHandler(w http.ResponseWriter, req *http.Request) {
}
}

// HTTP connection is accepted. Upgrade it to WebSocket.
// HTTP connection is accepted. Check if it is a plain HTTP request.

if req.Header.Get(headerContentType) == contentTypeProtobuf {
// Yes, a plain HTTP request.
s.handlePlainHTTPRequest(req, w)
return
}

// No, it is a WebSocket. Upgrade it.
conn, err := s.wsUpgrader.Upgrade(w, req, nil)
if err != nil {
s.logger.Errorf("Cannot upgrade HTTP connection to WebSocket: %v", err)
Expand All @@ -156,11 +168,16 @@ func (s *server) httpHandler(w http.ResponseWriter, req *http.Request) {
}

func (s *server) handleWSConnection(wsConn *websocket.Conn) {
agentConn := connection{wsConn: wsConn}
agentConn := wsConnection{wsConn: wsConn}

defer func() {
// Close the connection when all is done.
defer wsConn.Close()
defer func() {
err := wsConn.Close()
if err != nil {
s.logger.Errorf("error closing the WebSocket connection: %v", err)
}
}()

if s.settings.Callbacks != nil {
s.settings.Callbacks.OnConnectionClose(agentConn)
Expand Down Expand Up @@ -202,3 +219,41 @@ func (s *server) handleWSConnection(wsConn *websocket.Conn) {
}
}
}

func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter) {
bytes, err := io.ReadAll(req.Body)
if err != nil {
s.logger.Debugf("Cannot read HTTP body: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

// Decode WebSocket message as a Protobuf message.
var request protobufs.AgentToServer
err = proto.Unmarshal(bytes, &request)
if err != nil {
s.logger.Debugf("Cannot decode message from HTTP Body: %v", err)
w.WriteHeader(http.StatusBadRequest)
}

agentConn := httpConnection{
httpResponseWriter: w,
callbacks: s.settings.Callbacks,
conn: connFromRequest(req),
}

if s.settings.Callbacks == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// According to spec https://github.com/open-telemetry/opamp-spec#status-reporting
// "The Server MUST respond to the status report by sending a ServerToAgent message."
// So we don't send the HTTP response anywhere here. The OnMessage handler MUST
// respond to the message it receives by calling agentConn.Send() which in turn
// will send the HTTP response.

s.settings.Callbacks.OnConnected(agentConn)
s.settings.Callbacks.OnMessage(agentConn, &request)

}
69 changes: 69 additions & 0 deletions server/serverimpl_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package server

import (
"bytes"
"context"
"io"
"net/http"
"net/http/httptest"
"sync/atomic"
Expand Down Expand Up @@ -182,6 +184,73 @@ func TestServerReceiveSendMessage(t *testing.T) {
assert.EqualValues(t, protobufs.ServerCapabilities_AcceptsStatus, response.Capabilities)
}

func TestServerReceiveSendMessagePlainHTTP(t *testing.T) {
var rcvMsg atomic.Value
var onConnectedCalled, onCloseCalled int32
callbacks := CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{Accept: true}
},
OnConnectedFunc: func(conn types.Connection) {
atomic.StoreInt32(&onConnectedCalled, 1)
},
OnMessageFunc: func(conn types.Connection, message *protobufs.AgentToServer) {
// Remember received message.
rcvMsg.Store(message)

// Send a response.
response := protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
Capabilities: protobufs.ServerCapabilities_AcceptsStatus,
}
err := conn.Send(context.Background(), &response)
assert.NoError(t, err)
},
OnConnectionCloseFunc: func(conn types.Connection) {
atomic.StoreInt32(&onCloseCalled, 1)
},
}

// Start a server.
settings := &StartSettings{Settings: Settings{Callbacks: callbacks}}
srv := startServer(t, settings)
defer srv.Stop(context.Background())

// Send a message to the server.
sendMsg := protobufs.AgentToServer{
InstanceUid: "12345678",
}
b, err := proto.Marshal(&sendMsg)
require.NoError(t, err)
resp, err := http.Post("http://"+settings.ListenEndpoint+settings.ListenPath, contentTypeProtobuf, bytes.NewReader(b))
require.NoError(t, err)

// Wait until server receives the message.
eventually(t, func() bool { return rcvMsg.Load() != nil })
assert.True(t, atomic.LoadInt32(&onConnectedCalled) == 1)

// Verify the received message is what was sent.
assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg))

// Read server's response.
b, err = io.ReadAll(resp.Body)
require.NoError(t, err)

assert.EqualValues(t, http.StatusOK, resp.StatusCode)
assert.EqualValues(t, contentTypeProtobuf, resp.Header.Get(headerContentType))

// Decode the response.
var response protobufs.ServerToAgent
err = proto.Unmarshal(b, &response)
require.NoError(t, err)

// Verify the response.
assert.EqualValues(t, sendMsg.InstanceUid, response.InstanceUid)
assert.EqualValues(t, protobufs.ServerCapabilities_AcceptsStatus, response.Capabilities)

eventually(t, func() bool { return atomic.LoadInt32(&onCloseCalled) == 1 })
}

func TestServerAttachAcceptConnection(t *testing.T) {
connectedCalled := int32(0)
connectionCloseCalled := int32(0)
Expand Down
2 changes: 1 addition & 1 deletion server/types/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

// Connection represents one OpAMP WebSocket connections.
// Connection represents one OpAMP connection.
// The implementation MUST be a comparable type so that it can be used as a map key.
type Connection interface {
// RemoteAddr returns the remote network address of the connection.
Expand Down
9 changes: 5 additions & 4 deletions server/connection.go → server/wsconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (
"github.com/open-telemetry/opamp-go/server/types"
)

type connection struct {
// wsConnection represents a persistent OpAMP connection over a WebSocket.
type wsConnection struct {
wsConn *websocket.Conn
}

var _ types.Connection = (*connection)(nil)
var _ types.Connection = (*wsConnection)(nil)

func (c connection) RemoteAddr() net.Addr {
func (c wsConnection) RemoteAddr() net.Addr {
return c.wsConn.RemoteAddr()
}

func (c connection) Send(ctx context.Context, message *protobufs.ServerToAgent) error {
func (c wsConnection) Send(_ context.Context, message *protobufs.ServerToAgent) error {
bytes, err := proto.Marshal(message)
if err != nil {
return err
Expand Down

0 comments on commit 07090b0

Please sign in to comment.