Skip to content

Commit

Permalink
move forwarded monitor request into helper
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Nov 4, 2019
1 parent 52df108 commit b0a0cf0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 67 deletions.
1 change: 0 additions & 1 deletion client/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

// framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()
Expand Down
136 changes: 70 additions & 66 deletions nomad/client_agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,72 +62,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {

// Targeting a client so forward the request
if args.NodeID != "" {
nodeID := args.NodeID

snap, err := m.srv.State().Snapshot()
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}

node, err := snap.NodeByID(nil, nodeID)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}

if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

if err := nodeSupportsRpc(node); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

// Get the Connection to the client either by fowarding to another server
// or creating direct stream
var clientConn net.Conn
state, ok := m.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
if err != nil {
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
handleStreamResultError(err, code, encoder)
return
}
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}

clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
}
defer clientConn.Close()

// Send the Request
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, nil, encoder)
return
}

structs.Bridge(conn, clientConn)
return
m.forwardMonitor(conn, args, encoder, decoder)
}

// NodeID was empty, so monitor this current server
Expand Down Expand Up @@ -234,3 +169,72 @@ OUTER:
}
}
}

func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
nodeID := args.NodeID

snap, err := m.srv.State().Snapshot()
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}

node, err := snap.NodeByID(nil, nodeID)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}

if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

if err := nodeSupportsRpc(node); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

// Get the Connection to the client either by fowarding to another server
// or creating direct stream
var clientConn net.Conn
state, ok := m.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
if err != nil {
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
handleStreamResultError(err, code, encoder)
return
}
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}

clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
}
defer clientConn.Close()

// Send the Request
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, nil, encoder)
return
}

structs.Bridge(conn, clientConn)
return
}

0 comments on commit b0a0cf0

Please sign in to comment.