Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client RPC Endpoints, Server Routing and Streaming RPCs #3892

Merged
merged 80 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
aaf883b
Helper to populate RPC server endpoints
dadgar Jan 3, 2018
96587f2
Dynamic RPC servers with context
dadgar Jan 4, 2018
4c9caff
Improve TLS cluster testing
dadgar Jan 5, 2018
052639e
Track client connections
dadgar Jan 5, 2018
9be9847
Store the whole verified certificate chain
dadgar Jan 11, 2018
9680e79
Conn Pool can emit new connections to listeners.
dadgar Jan 11, 2018
0b6e4e2
Pull inmem codec to helper
dadgar Jan 11, 2018
ddee97c
Stats Endpoint
dadgar Jan 11, 2018
784f68e
RPC Listener
dadgar Jan 11, 2018
298b1c4
Server can forward ClientStats.Stats
dadgar Jan 11, 2018
1611fe5
Add testing interfaces
dadgar Jan 12, 2018
d15bb76
Refactor
dadgar Jan 12, 2018
9c570da
Test RPC from server
dadgar Jan 12, 2018
c9ddf89
Test http
dadgar Jan 12, 2018
1d9b252
Use nomad UUID
dadgar Jan 13, 2018
a6baf71
Remove testing
dadgar Jan 15, 2018
96c3543
Store connection time
dadgar Jan 12, 2018
61eaa10
Add RPC for querying for Node connections
dadgar Jan 13, 2018
5c6b6b8
Forwarding
dadgar Jan 15, 2018
13e4564
code review
dadgar Jan 23, 2018
b97b44a
use server manager
dadgar Jan 9, 2018
0a047bb
SetServer command actually returns an error if given an invalid server
dadgar Jan 10, 2018
5c1ba8f
Change defaults for min use duration
dadgar Jan 10, 2018
6860037
Plumb config
dadgar Jan 10, 2018
5618fd4
initial round of comment review
dadgar Jan 25, 2018
fa6c90a
Unjankify the pkg
dadgar Jan 25, 2018
eade7ff
Fix lint/comments
dadgar Jan 25, 2018
e2d1ce8
Fix manager tests and make testagent recover from port conflicts
dadgar Jan 26, 2018
ebce3f9
fix lint
dadgar Jan 26, 2018
496ab5f
Remove circular dependency
dadgar Jan 26, 2018
e69506a
Use in-mem rpc
dadgar Jan 26, 2018
70ff5af
wip fs endpoint
dadgar Jan 17, 2018
c0e01d8
New RPC Modes and basic setup for streaming RPC handlers
dadgar Jan 19, 2018
5e7a1a4
Logs over RPC w/ lots to touch up
dadgar Jan 21, 2018
d9722fa
Server streaming
dadgar Jan 22, 2018
9d479f3
test stream framer
dadgar Jan 26, 2018
2b6a7eb
Remove logging
dadgar Jan 26, 2018
8e557c9
Refactor client RPCs from server
dadgar Jan 27, 2018
c677cf8
Forwarding
dadgar Jan 30, 2018
a9ed7a8
Server tests of logs
dadgar Jan 31, 2018
c76b311
client tests
dadgar Jan 31, 2018
7e5a30d
Agent logs
dadgar Feb 1, 2018
3c689ba
Client Stat/List impl
dadgar Feb 1, 2018
a1eff9d
Server stat/list impl
dadgar Feb 1, 2018
69dc065
Client implementation of stream
dadgar Feb 1, 2018
b954114
Server implementation of stream
dadgar Feb 1, 2018
d77b366
HTTP and tests
dadgar Feb 5, 2018
b257812
move error
dadgar Feb 5, 2018
3cbd7e8
vet
dadgar Feb 5, 2018
ac1a0de
remove changes to the demo clients
dadgar Feb 5, 2018
0706f4e
Add Streaming RPC ack
dadgar Feb 6, 2018
99c0bdf
Implement MultiplexV2 RPC handling
dadgar Feb 6, 2018
715006a
Document server handling of client endpoints
dadgar Feb 7, 2018
3e41086
Respond to comments
dadgar Feb 8, 2018
ff79fbc
Streaming helper
dadgar Feb 8, 2018
b0d0359
clarify force
dadgar Feb 9, 2018
a80ef65
Code review feedback
dadgar Feb 13, 2018
4ac1e25
Refactor determining the handler for a node id call
dadgar Feb 5, 2018
ce37dee
client implementation of alloc gc and stats
dadgar Feb 6, 2018
3494850
Server side impl + touch ups
dadgar Feb 6, 2018
0fb2b5c
HTTP agent
dadgar Feb 6, 2018
e05cd42
Use helper for forwarding
dadgar Feb 8, 2018
4f332ff
feedback and rebasing
dadgar Feb 13, 2018
de727a6
fix flaky gc tests
dadgar Feb 14, 2018
4f2725b
Enhance API pkg to utilize Server's Client Tunnel
dadgar Feb 7, 2018
2a2e183
Update client2.hcl
dadgar Feb 7, 2018
dab5dc7
allow setting timeout on any api config
dadgar Feb 14, 2018
0d001d0
Fix incorrect deletion of node conn
dadgar Feb 7, 2018
fa044de
fix test
dadgar Feb 14, 2018
aef31b7
improve test
dadgar Feb 14, 2018
d47129f
add logging
dadgar Feb 14, 2018
c4ef9a2
doc improvements
dadgar Feb 14, 2018
153d1e5
Server TLS
dadgar Feb 15, 2018
f062c93
Client tls
dadgar Feb 15, 2018
61ac63e
fix unknown rpc tests
dadgar Feb 15, 2018
dcb13d2
Fix autopilot tests
dadgar Feb 15, 2018
3be9940
remove tmp file
dadgar Feb 15, 2018
efa8bc0
Fix original client server list behavior
dadgar Feb 16, 2018
cfd02bc
vet
dadgar Feb 16, 2018
42a920a
Merge pull request #3877 from hashicorp/f-tls
dadgar Feb 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, err
}

var resp AllocResourceUsage
_, err = nodeClient.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil)
path := fmt.Sprintf("/v1/client/allocation/%s/stats", alloc.ID)
_, err := a.client.query(path, &resp, q)
return &resp, err
}

Expand Down
54 changes: 52 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -18,6 +19,13 @@ import (
rootcerts "github.com/hashicorp/go-rootcerts"
)

var (
// ClientConnTimeout is the timeout applied when attempting to contact a
// client directly before switching to a connection through the Nomad
// server.
ClientConnTimeout = 1 * time.Second
)

// QueryOptions are used to parameterize a query
type QueryOptions struct {
// Providing a datacenter overwrites the region provided
Expand Down Expand Up @@ -145,6 +153,8 @@ func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config {
WaitTime: c.WaitTime,
TLSConfig: c.TLSConfig.Copy(),
}

// Update the tls server name for connecting to a client
if tlsEnabled && config.TLSConfig != nil {
config.TLSConfig.TLSServerName = fmt.Sprintf("client.%s.nomad", region)
}
Expand Down Expand Up @@ -249,6 +259,34 @@ func DefaultConfig() *Config {
return config
}

// SetTimeout is used to place a timeout for connecting to Nomad. A negative
// duration is ignored, a duration of zero means no timeout, and any other value
// will add a timeout.
func (c *Config) SetTimeout(t time.Duration) error {
if c == nil {
return fmt.Errorf("nil config")
} else if c.httpClient == nil {
return fmt.Errorf("nil HTTP client")
} else if c.httpClient.Transport == nil {
return fmt.Errorf("nil HTTP client transport")
}

// Apply a timeout.
if t.Nanoseconds() >= 0 {
transport, ok := c.httpClient.Transport.(*http.Transport)
if !ok {
return fmt.Errorf("unexpected HTTP transport: %T", c.httpClient.Transport)
}

transport.DialContext = (&net.Dialer{
Timeout: t,
KeepAlive: 30 * time.Second,
}).DialContext
}

return nil
}

// ConfigureTLS applies a set of TLS configurations to the the HTTP client.
func (c *Config) ConfigureTLS() error {
if c.TLSConfig == nil {
Expand Down Expand Up @@ -343,7 +381,15 @@ func (c *Client) SetNamespace(namespace string) {
// GetNodeClient returns a new Client that will dial the specified node. If the
// QueryOptions is set, its region will be used.
func (c *Client) GetNodeClient(nodeID string, q *QueryOptions) (*Client, error) {
return c.getNodeClientImpl(nodeID, q, c.Nodes().Info)
return c.getNodeClientImpl(nodeID, -1, q, c.Nodes().Info)
}

// GetNodeClientWithTimeout returns a new Client that will dial the specified
// node using the specified timeout. If the QueryOptions is set, its region will
// be used.
func (c *Client) GetNodeClientWithTimeout(
nodeID string, timeout time.Duration, q *QueryOptions) (*Client, error) {
return c.getNodeClientImpl(nodeID, timeout, q, c.Nodes().Info)
}

// nodeLookup is the definition of a function used to lookup a node. This is
Expand All @@ -353,7 +399,7 @@ type nodeLookup func(nodeID string, q *QueryOptions) (*Node, *QueryMeta, error)
// getNodeClientImpl is the implementation of creating a API client for
// contacting a node. It takes a function to lookup the node such that it can be
// mocked during tests.
func (c *Client) getNodeClientImpl(nodeID string, q *QueryOptions, lookup nodeLookup) (*Client, error) {
func (c *Client) getNodeClientImpl(nodeID string, timeout time.Duration, q *QueryOptions, lookup nodeLookup) (*Client, error) {
node, _, err := lookup(nodeID, q)
if err != nil {
return nil, err
Expand All @@ -380,6 +426,10 @@ func (c *Client) getNodeClientImpl(nodeID string, q *QueryOptions, lookup nodeLo

// Get an API client for the node
conf := c.config.ClientConfig(region, node.HTTPAddr, node.TLSEnabled)

// Set the timeout
conf.SetTimeout(timeout)

return NewClient(conf)
}

Expand Down
2 changes: 1 addition & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestClient_NodeClient(t *testing.T) {
name := fmt.Sprintf("%s__%s__%s", c.ExpectedAddr, c.ExpectedRegion, c.ExpectedTLSServerName)
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
nodeClient, err := c.Client.getNodeClientImpl("testID", c.QueryOptions, c.Node)
nodeClient, err := c.Client.getNodeClientImpl("testID", -1, c.QueryOptions, c.Node)
assert.Nil(err)
assert.Equal(c.ExpectedRegion, nodeClient.config.Region)
assert.Equal(c.ExpectedAddr, nodeClient.config.Address)
Expand Down
89 changes: 61 additions & 28 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -51,22 +52,16 @@ func (c *Client) AllocFS() *AllocFS {

// List is used to list the files at a given path of an allocation directory
func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path

var resp []*AllocFileInfo
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
if err != nil {
return nil, nil, err
}
Expand All @@ -76,11 +71,6 @@ func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*Allo

// Stat is used to stat a file at a given path of an allocation directory
func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if q == nil {
q = &QueryOptions{}
}
Expand All @@ -91,7 +81,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
q.Params["path"] = path

var resp AllocFileInfo
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
if err != nil {
return nil, nil, err
}
Expand All @@ -101,7 +91,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}
Expand All @@ -117,17 +107,28 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
return nil, err
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
}

// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}
Expand All @@ -140,11 +141,21 @@ func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadC
}

q.Params["path"] = path

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
return nil, err
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
}

Expand All @@ -160,7 +171,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
Expand All @@ -177,10 +188,21 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
Expand Down Expand Up @@ -236,7 +258,7 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
Expand All @@ -255,10 +277,21 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
Expand Down
22 changes: 12 additions & 10 deletions api/nodes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"fmt"
"sort"
"strconv"
)
Expand Down Expand Up @@ -72,25 +73,26 @@ func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMet
}

func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) {
nodeClient, err := n.client.GetNodeClient(nodeID, q)
if err != nil {
return nil, err
}
var resp HostStats
if _, err := nodeClient.query("/v1/client/stats", &resp, nil); err != nil {
path := fmt.Sprintf("/v1/client/stats?node_id=%s", nodeID)
if _, err := n.client.query(path, &resp, q); err != nil {
return nil, err
}
return &resp, nil
}

func (n *Nodes) GC(nodeID string, q *QueryOptions) error {
nodeClient, err := n.client.GetNodeClient(nodeID, q)
if err != nil {
return err
}
var resp struct{}
path := fmt.Sprintf("/v1/client/gc?node_id=%s", nodeID)
_, err := n.client.query(path, &resp, q)
return err
}

// TODO Add tests
func (n *Nodes) GcAlloc(allocID string, q *QueryOptions) error {
var resp struct{}
_, err = nodeClient.query("/v1/client/gc", &resp, nil)
path := fmt.Sprintf("/v1/client/allocation/%s/gc", allocID)
_, err := n.client.query(path, &resp, q)
return err
}

Expand Down
27 changes: 27 additions & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestNodes_List(t *testing.T) {
Expand Down Expand Up @@ -275,3 +278,27 @@ func TestNodes_Sort(t *testing.T) {
t.Fatalf("\n\n%#v\n\n%#v", nodes, expect)
}
}

func TestNodes_GC(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

err := nodes.GC(uuid.Generate(), nil)
require.NotNil(err)
require.True(structs.IsErrUnknownNode(err))
}

func TestNodes_GcAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

err := nodes.GcAlloc(uuid.Generate(), nil)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
Loading