From 20318e4fbe0334a788947e876768862313e1b54f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 7 Dec 2015 10:45:49 +0000 Subject: [PATCH 1/3] Add cancellable attach. --- client.go | 156 ++++++++++++++++++++++++++++++++------------------- container.go | 22 ++++++++ exec.go | 6 +- 3 files changed, 126 insertions(+), 58 deletions(-) diff --git a/client.go b/client.go index a5fabd15..bdcfdcc6 100644 --- a/client.go +++ b/client.go @@ -554,24 +554,37 @@ type hijackOptions struct { data interface{} } -func (c *Client) hijack(method, path string, hijackOptions hijackOptions) error { +type CloseWaiter interface { + io.Closer + Wait() error +} + +type waiterFunc func() error + +func (w waiterFunc) Wait() error { return w() } + +type closerFunc func() error + +func (c closerFunc) Close() error { return c() } + +func (c *Client) hijack(method, path string, hijackOptions hijackOptions) (CloseWaiter, error) { if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil { err := c.checkAPIVersion() if err != nil { - return err + return nil, err } } var params io.Reader if hijackOptions.data != nil { buf, err := json.Marshal(hijackOptions.data) if err != nil { - return err + return nil, err } params = bytes.NewBuffer(buf) } req, err := http.NewRequest(method, c.getURL(path), params) if err != nil { - return err + return nil, err } req.Header.Set("Content-Type", "plain/text") req.Header.Set("Connection", "Upgrade") @@ -586,74 +599,103 @@ func (c *Client) hijack(method, path string, hijackOptions hijackOptions) error if c.TLSConfig != nil && protocol != "unix" { dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig) if err != nil { - return err + return nil, err } } else { dial, err = c.Dialer.Dial(protocol, address) if err != nil { - return err + return nil, err } } - clientconn := httputil.NewClientConn(dial, nil) - defer clientconn.Close() - clientconn.Do(req) - if hijackOptions.success != nil { - hijackOptions.success <- struct{}{} - <-hijackOptions.success - } - rwc, br := clientconn.Hijack() - defer rwc.Close() - errChanOut := make(chan error, 1) - errChanIn := make(chan error, 1) - if hijackOptions.stdout == nil && hijackOptions.stderr == nil { - close(errChanOut) - } else { - // Only copy if hijackOptions.stdout and/or hijackOptions.stderr is actually set. - // Otherwise, if the only stream you care about is stdin, your attach session - // will "hang" until the container terminates, even though you're not reading - // stdout/stderr - if hijackOptions.stdout == nil { - hijackOptions.stdout = ioutil.Discard - } - if hijackOptions.stderr == nil { - hijackOptions.stderr = ioutil.Discard - } - go func() { - defer func() { - if hijackOptions.in != nil { - if closer, ok := hijackOptions.in.(io.Closer); ok { - closer.Close() + errs := make(chan error) + quit := make(chan struct{}) + go func() { + clientconn := httputil.NewClientConn(dial, nil) + defer clientconn.Close() + clientconn.Do(req) + if hijackOptions.success != nil { + hijackOptions.success <- struct{}{} + <-hijackOptions.success + } + rwc, br := clientconn.Hijack() + defer rwc.Close() + + errChanOut := make(chan error, 1) + errChanIn := make(chan error, 1) + if hijackOptions.stdout == nil && hijackOptions.stderr == nil { + close(errChanOut) + } else { + // Only copy if hijackOptions.stdout and/or hijackOptions.stderr is actually set. + // Otherwise, if the only stream you care about is stdin, your attach session + // will "hang" until the container terminates, even though you're not reading + // stdout/stderr + if hijackOptions.stdout == nil { + hijackOptions.stdout = ioutil.Discard + } + if hijackOptions.stderr == nil { + hijackOptions.stderr = ioutil.Discard + } + + go func() { + defer func() { + if hijackOptions.in != nil { + if closer, ok := hijackOptions.in.(io.Closer); ok { + closer.Close() + } + errChanIn <- nil } - errChanIn <- nil + }() + + var err error + if hijackOptions.setRawTerminal { + _, err = io.Copy(hijackOptions.stdout, br) + } else { + _, err = stdcopy.StdCopy(hijackOptions.stdout, hijackOptions.stderr, br) } + errChanOut <- err }() + } + go func() { var err error - if hijackOptions.setRawTerminal { - _, err = io.Copy(hijackOptions.stdout, br) - } else { - _, err = stdcopy.StdCopy(hijackOptions.stdout, hijackOptions.stderr, br) + if hijackOptions.in != nil { + _, err = io.Copy(rwc, hijackOptions.in) } - errChanOut <- err + errChanIn <- err + rwc.(interface { + CloseWrite() error + }).CloseWrite() }() - } - go func() { - var err error - if hijackOptions.in != nil { - _, err = io.Copy(rwc, hijackOptions.in) - } - errChanIn <- err - rwc.(interface { - CloseWrite() error - }).CloseWrite() + + var errIn error + select { + case errIn = <-errChanIn: + case <-quit: + return + } + + var errOut error + select { + case errOut = <-errChanOut: + case <-quit: + return + } + + if errIn != nil { + errs <- errIn + } else { + errs <- errOut + } }() - errIn := <-errChanIn - errOut := <-errChanOut - if errIn != nil { - return errIn - } - return errOut + + return struct { + closerFunc + waiterFunc + }{ + closerFunc(func() error { close(quit); return nil }), + waiterFunc(func() error { return <-errs }), + }, nil } func (c *Client) getURL(path string) string { diff --git a/container.go b/container.go index 1b3e17e3..f2eadd5e 100644 --- a/container.go +++ b/container.go @@ -1040,6 +1040,28 @@ func (c *Client) AttachToContainer(opts AttachToContainerOptions) error { return &NoSuchContainer{ID: opts.Container} } path := "/containers/" + opts.Container + "/attach?" + queryString(opts) + cw, err := c.hijack("POST", path, hijackOptions{ + success: opts.Success, + setRawTerminal: opts.RawTerminal, + in: opts.InputStream, + stdout: opts.OutputStream, + stderr: opts.ErrorStream, + }) + if err != nil { + return err + } + return cw.Wait() +} + +// AttachToContainerNonBlocking attaches to a container, using the given options. +// This function does not block. +// +// See https://goo.gl/NKpkFk for more details. +func (c *Client) AttachToContainerNonBlocking(opts AttachToContainerOptions) (CloseWaiter, error) { + if opts.Container == "" { + return nil, &NoSuchContainer{ID: opts.Container} + } + path := "/containers/" + opts.Container + "/attach?" + queryString(opts) return c.hijack("POST", path, hijackOptions{ success: opts.Success, setRawTerminal: opts.RawTerminal, diff --git a/exec.go b/exec.go index f3b705fa..5ff82513 100644 --- a/exec.go +++ b/exec.go @@ -101,7 +101,7 @@ func (c *Client) StartExec(id string, opts StartExecOptions) error { return nil } - return c.hijack("POST", path, hijackOptions{ + cw, err := c.hijack("POST", path, hijackOptions{ success: opts.Success, setRawTerminal: opts.RawTerminal, in: opts.InputStream, @@ -109,6 +109,10 @@ func (c *Client) StartExec(id string, opts StartExecOptions) error { stderr: opts.ErrorStream, data: opts, }) + if err != nil { + return err + } + return cw.Wait() } // ResizeExecTTY resizes the tty session used by the exec command id. This API From 33922184a6ff6ac6f97b1cd584190088bec3bde1 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 8 Dec 2015 13:33:46 +0000 Subject: [PATCH 2/3] Add StartExecNonBlocking --- exec.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/exec.go b/exec.go index 5ff82513..e164bfb6 100644 --- a/exec.go +++ b/exec.go @@ -115,6 +115,40 @@ func (c *Client) StartExec(id string, opts StartExecOptions) error { return cw.Wait() } +// StartExecNonBlocking starts a previously set up exec instance id. If opts.Detach is +// true, it returns after starting the exec command. Otherwise, it sets up an +// interactive session with the exec command. +// +// See https://goo.gl/iQCnto for more details +func (c *Client) StartExecNonBlocking(id string, opts StartExecOptions) (CloseWaiter, error) { + if id == "" { + return nil, &NoSuchExec{ID: id} + } + + path := fmt.Sprintf("/exec/%s/start", id) + + if opts.Detach { + resp, err := c.do("POST", path, doOptions{data: opts}) + if err != nil { + if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound { + return nil, &NoSuchExec{ID: id} + } + return nil, err + } + defer resp.Body.Close() + return nil, nil + } + + return c.hijack("POST", path, hijackOptions{ + success: opts.Success, + setRawTerminal: opts.RawTerminal, + in: opts.InputStream, + stdout: opts.OutputStream, + stderr: opts.ErrorStream, + data: opts, + }) +} + // ResizeExecTTY resizes the tty session used by the exec command id. This API // is valid only if Tty was specified as part of creating and starting the exec // command. From 4dd2c2146ae783fb4d17d235bf43a7a24d69e4cc Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 15 Dec 2015 18:02:18 +0000 Subject: [PATCH 3/3] Review feedback: make StartExec use StartExecNonBlocking etc --- container.go | 12 +----------- exec.go | 32 +++++--------------------------- 2 files changed, 6 insertions(+), 38 deletions(-) diff --git a/container.go b/container.go index f2eadd5e..a8f847d1 100644 --- a/container.go +++ b/container.go @@ -1036,17 +1036,7 @@ type AttachToContainerOptions struct { // // See https://goo.gl/NKpkFk for more details. func (c *Client) AttachToContainer(opts AttachToContainerOptions) error { - if opts.Container == "" { - return &NoSuchContainer{ID: opts.Container} - } - path := "/containers/" + opts.Container + "/attach?" + queryString(opts) - cw, err := c.hijack("POST", path, hijackOptions{ - success: opts.Success, - setRawTerminal: opts.RawTerminal, - in: opts.InputStream, - stdout: opts.OutputStream, - stderr: opts.ErrorStream, - }) + cw, err := c.AttachToContainerNonBlocking(opts) if err != nil { return err } diff --git a/exec.go b/exec.go index e164bfb6..1a16da9d 100644 --- a/exec.go +++ b/exec.go @@ -83,36 +83,14 @@ type StartExecOptions struct { // // See https://goo.gl/iQCnto for more details func (c *Client) StartExec(id string, opts StartExecOptions) error { - if id == "" { - return &NoSuchExec{ID: id} - } - - path := fmt.Sprintf("/exec/%s/start", id) - - if opts.Detach { - resp, err := c.do("POST", path, doOptions{data: opts}) - if err != nil { - if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound { - return &NoSuchExec{ID: id} - } - return err - } - defer resp.Body.Close() - return nil - } - - cw, err := c.hijack("POST", path, hijackOptions{ - success: opts.Success, - setRawTerminal: opts.RawTerminal, - in: opts.InputStream, - stdout: opts.OutputStream, - stderr: opts.ErrorStream, - data: opts, - }) + cw, err := c.StartExecNonBlocking(id, opts) if err != nil { return err } - return cw.Wait() + if cw != nil { + return cw.Wait() + } + return nil } // StartExecNonBlocking starts a previously set up exec instance id. If opts.Detach is