diff --git a/api/allocations.go b/api/allocations.go index 3a22af78e8c..05892a77e05 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -210,12 +210,7 @@ func (a *Allocations) Exec(ctx context.Context, func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string, errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) { - - nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) - if err != nil { - errCh <- err - return nil, nil - } + nodeClient, _ := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) if q == nil { q = &QueryOptions{} @@ -236,15 +231,17 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID) - conn, _, err := nodeClient.websocket(reqPath, q) - if err != nil { - // There was an error talking directly to the client. Non-network - // errors are fatal, but network errors can attempt to route via RPC. - if _, ok := err.(net.Error); !ok { + var conn *websocket.Conn + + if nodeClient != nil { + conn, _, err = nodeClient.websocket(reqPath, q) + if _, ok := err.(net.Error); err != nil && !ok { errCh <- err return nil, nil } + } + if conn == nil { conn, _, err = a.client.websocket(reqPath, q) if err != nil { errCh <- err diff --git a/api/fs.go b/api/fs.go index b769236f76f..e5340c9668b 100644 --- a/api/fs.go +++ b/api/fs.go @@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF // ReadAt is used to read bytes at a given offset until limit at the given path // in an allocation directory. If limit is <= 0, there is no limit. func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) { - nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) - if err != nil { - return nil, err - } - - if q == nil { - q = &QueryOptions{} - } - if q.Params == nil { - q.Params = make(map[string]string) - } - - q.Params["path"] = path - q.Params["offset"] = strconv.FormatInt(offset, 10) - q.Params["limit"] = strconv.FormatInt(limit, 10) - reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID) - r, err := nodeClient.rawQuery(reqPath, q) - if err != nil { - // There was a networking error when talking directly to the client. - if _, ok := err.(net.Error); !ok { - return nil, err - } - // Try via the server - r, err = a.client.rawQuery(reqPath, q) - if err != nil { - return nil, err - } - } - - return r, nil + return queryClientNode(a.client, alloc, reqPath, q, + func(q *QueryOptions) { + q.Params["path"] = path + q.Params["offset"] = strconv.FormatInt(offset, 10) + q.Params["limit"] = strconv.FormatInt(limit, 10) + }) } // Cat is used to read contents of a file at the given path in an allocation // directory func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) { - nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) - if err != nil { - return nil, err - } - - if q == nil { - q = &QueryOptions{} - } - if q.Params == nil { - q.Params = make(map[string]string) - } - - q.Params["path"] = path reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID) - r, err := nodeClient.rawQuery(reqPath, q) - if err != nil { - // There was a networking error when talking directly to the client. - if _, ok := err.(net.Error); !ok { - return nil, err - } - - // Try via the server - r, err = a.client.rawQuery(reqPath, q) - if err != nil { - return nil, err - } - } - - return r, nil + return queryClientNode(a.client, alloc, reqPath, q, + func(q *QueryOptions) { + q.Params["path"] = path + }) } // Stream streams the content of a file blocking on EOF. @@ -172,38 +124,17 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { errCh := make(chan error, 1) - nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) - if err != nil { - errCh <- err - return nil, errCh - } - - if q == nil { - q = &QueryOptions{} - } - if q.Params == nil { - q.Params = make(map[string]string) - } - - q.Params["path"] = path - q.Params["offset"] = strconv.FormatInt(offset, 10) - q.Params["origin"] = origin reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID) - r, err := nodeClient.rawQuery(reqPath, q) + r, err := queryClientNode(a.client, alloc, reqPath, q, + func(q *QueryOptions) { + q.Params["path"] = path + q.Params["offset"] = strconv.FormatInt(offset, 10) + q.Params["origin"] = origin + }) if err != nil { - // There was a networking error when talking directly to the client. - if _, ok := err.(net.Error); !ok { - errCh <- err - return nil, errCh - } - - // Try via the server - r, err = a.client.rawQuery(reqPath, q) - if err != nil { - errCh <- err - return nil, errCh - } + errCh <- err + return nil, errCh } // Create the output channel @@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, return frames, errCh } +func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) { + nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) + + if q == nil { + q = &QueryOptions{} + } + if q.Params == nil { + q.Params = make(map[string]string) + } + if customizeQ != nil { + customizeQ(q) + } + + var r io.ReadCloser + var err error + + if nodeClient != nil { + r, err = nodeClient.rawQuery(reqPath, q) + if _, ok := err.(net.Error); err != nil && !ok { + // found a non networking error talking to client directly + return nil, err + } + + } + + // failed to query node, access through server directly + // or network error when talking to the client directly + if r == nil { + return c.rawQuery(reqPath, q) + } + + return r, err +} + // Logs streams the content of a tasks logs blocking on EOF. // The parameters are: // * allocation: the allocation to stream from. @@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str errCh := make(chan error, 1) - nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) + reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID) + r, err := queryClientNode(a.client, alloc, reqPath, q, + func(q *QueryOptions) { + q.Params["follow"] = strconv.FormatBool(follow) + q.Params["task"] = task + q.Params["type"] = logType + q.Params["origin"] = origin + q.Params["offset"] = strconv.FormatInt(offset, 10) + }) if err != nil { errCh <- err return nil, errCh } - if q == nil { - q = &QueryOptions{} - } - if q.Params == nil { - q.Params = make(map[string]string) - } - - q.Params["follow"] = strconv.FormatBool(follow) - q.Params["task"] = task - q.Params["type"] = logType - q.Params["origin"] = origin - q.Params["offset"] = strconv.FormatInt(offset, 10) - - reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID) - r, err := nodeClient.rawQuery(reqPath, q) - if err != nil { - // There was a networking error when talking directly to the client. - if _, ok := err.(net.Error); !ok { - errCh <- err - return nil, errCh - } - - // Try via the server - r, err = a.client.rawQuery(reqPath, q) - if err != nil { - errCh <- err - return nil, errCh - } - } - // Create the output channel frames := make(chan *StreamFrame, 10)