-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cli: recover from client ACL lookup failures #6423
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF | |
// ReadAt is used to read bytes at a given offset until limit at the given path | ||
// in an allocation directory. If limit is <= 0, there is no limit. | ||
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) { | ||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if q == nil { | ||
q = &QueryOptions{} | ||
} | ||
if q.Params == nil { | ||
q.Params = make(map[string]string) | ||
} | ||
|
||
q.Params["path"] = path | ||
q.Params["offset"] = strconv.FormatInt(offset, 10) | ||
q.Params["limit"] = strconv.FormatInt(limit, 10) | ||
|
||
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID) | ||
r, err := nodeClient.rawQuery(reqPath, q) | ||
if err != nil { | ||
// There was a networking error when talking directly to the client. | ||
if _, ok := err.(net.Error); !ok { | ||
return nil, err | ||
} | ||
|
||
// Try via the server | ||
r, err = a.client.rawQuery(reqPath, q) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return r, nil | ||
return queryClientNode(a.client, alloc, reqPath, q, | ||
func(q *QueryOptions) { | ||
q.Params["path"] = path | ||
q.Params["offset"] = strconv.FormatInt(offset, 10) | ||
q.Params["limit"] = strconv.FormatInt(limit, 10) | ||
}) | ||
} | ||
|
||
// Cat is used to read contents of a file at the given path in an allocation | ||
// directory | ||
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) { | ||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if q == nil { | ||
q = &QueryOptions{} | ||
} | ||
if q.Params == nil { | ||
q.Params = make(map[string]string) | ||
} | ||
|
||
q.Params["path"] = path | ||
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID) | ||
r, err := nodeClient.rawQuery(reqPath, q) | ||
if err != nil { | ||
// There was a networking error when talking directly to the client. | ||
if _, ok := err.(net.Error); !ok { | ||
return nil, err | ||
} | ||
|
||
// Try via the server | ||
r, err = a.client.rawQuery(reqPath, q) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return r, nil | ||
return queryClientNode(a.client, alloc, reqPath, q, | ||
func(q *QueryOptions) { | ||
q.Params["path"] = path | ||
}) | ||
} | ||
|
||
// Stream streams the content of a file blocking on EOF. | ||
|
@@ -172,38 +124,17 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, | |
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { | ||
|
||
errCh := make(chan error, 1) | ||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) | ||
if err != nil { | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
|
||
if q == nil { | ||
q = &QueryOptions{} | ||
} | ||
if q.Params == nil { | ||
q.Params = make(map[string]string) | ||
} | ||
|
||
q.Params["path"] = path | ||
q.Params["offset"] = strconv.FormatInt(offset, 10) | ||
q.Params["origin"] = origin | ||
|
||
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID) | ||
r, err := nodeClient.rawQuery(reqPath, q) | ||
r, err := queryClientNode(a.client, alloc, reqPath, q, | ||
func(q *QueryOptions) { | ||
q.Params["path"] = path | ||
q.Params["offset"] = strconv.FormatInt(offset, 10) | ||
q.Params["origin"] = origin | ||
}) | ||
if err != nil { | ||
// There was a networking error when talking directly to the client. | ||
if _, ok := err.(net.Error); !ok { | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
|
||
// Try via the server | ||
r, err = a.client.rawQuery(reqPath, q) | ||
if err != nil { | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
|
||
// Create the output channel | ||
|
@@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, | |
return frames, errCh | ||
} | ||
|
||
func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) { | ||
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way we could debug log this error? I see a future engineer trying to debug why an api call isn't able to hit the client directly and this error would probably be the key bit of knowledge. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure how to best handle it. We currently don't log anything in api package, and introducing logging might be confusing for cli tools, specially if eventually requests succeeds through server. |
||
|
||
if q == nil { | ||
q = &QueryOptions{} | ||
} | ||
if q.Params == nil { | ||
q.Params = make(map[string]string) | ||
} | ||
if customizeQ != nil { | ||
customizeQ(q) | ||
} | ||
|
||
var r io.ReadCloser | ||
var err error | ||
|
||
if nodeClient != nil { | ||
r, err = nodeClient.rawQuery(reqPath, q) | ||
if _, ok := err.(net.Error); err != nil && !ok { | ||
// found a non networking error talking to client directly | ||
return nil, err | ||
} | ||
|
||
} | ||
|
||
// failed to query node, access through server directly | ||
// or network error when talking to the client directly | ||
if r == nil { | ||
return c.rawQuery(reqPath, q) | ||
} | ||
|
||
return r, err | ||
} | ||
|
||
// Logs streams the content of a tasks logs blocking on EOF. | ||
// The parameters are: | ||
// * allocation: the allocation to stream from. | ||
|
@@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str | |
|
||
errCh := make(chan error, 1) | ||
|
||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) | ||
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID) | ||
r, err := queryClientNode(a.client, alloc, reqPath, q, | ||
func(q *QueryOptions) { | ||
q.Params["follow"] = strconv.FormatBool(follow) | ||
q.Params["task"] = task | ||
q.Params["type"] = logType | ||
q.Params["origin"] = origin | ||
q.Params["offset"] = strconv.FormatInt(offset, 10) | ||
}) | ||
if err != nil { | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
|
||
if q == nil { | ||
q = &QueryOptions{} | ||
} | ||
if q.Params == nil { | ||
q.Params = make(map[string]string) | ||
} | ||
|
||
q.Params["follow"] = strconv.FormatBool(follow) | ||
q.Params["task"] = task | ||
q.Params["type"] = logType | ||
q.Params["origin"] = origin | ||
q.Params["offset"] = strconv.FormatInt(offset, 10) | ||
|
||
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID) | ||
r, err := nodeClient.rawQuery(reqPath, q) | ||
if err != nil { | ||
// There was a networking error when talking directly to the client. | ||
if _, ok := err.(net.Error); !ok { | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
|
||
// Try via the server | ||
r, err = a.client.rawQuery(reqPath, q) | ||
if err != nil { | ||
errCh <- err | ||
return nil, errCh | ||
} | ||
} | ||
|
||
// Create the output channel | ||
frames := make(chan *StreamFrame, 10) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See other comment about logging this error