Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Commit

Permalink
Close open connections when changing primary
Browse files Browse the repository at this point in the history
  • Loading branch information
Wayne Song committed Oct 20, 2016
1 parent 60932b5 commit b0609bc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
3 changes: 2 additions & 1 deletion api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,8 @@ func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.Replace(r.URL.Path, name, container.ID, 1)
}

err = hijack(c.tlsConfig, container.Engine.Addr, w, r)
stopCh := make(chan struct{})
err = hijack(c.tlsConfig, container.Engine.Addr, w, r, stopCh)
container.Engine.CheckConnectionErr(err)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
Expand Down
16 changes: 9 additions & 7 deletions api/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@ var localRoutes = []string{"/info", "/_ping", "/debug"}

// Replica is an API replica that reserves proxy to the primary.
type Replica struct {
handler http.Handler
tlsConfig *tls.Config
primary string
handler http.Handler
tlsConfig *tls.Config
primary string
primaryChanged chan struct{}
}

// NewReplica creates a new API replica.
func NewReplica(handler http.Handler, tlsConfig *tls.Config) *Replica {
return &Replica{
handler: handler,
tlsConfig: tlsConfig,
handler: handler,
tlsConfig: tlsConfig,
primaryChanged: make(chan struct{}),
}
}

// SetPrimary sets the address of the primary Swarm manager
func (p *Replica) SetPrimary(primary string) {
// FIXME: We have to kill current connections before doing this.
p.primary = primary
p.primaryChanged <- struct{}{}
}

// ServeHTTP is the http.Handler.
Expand All @@ -46,7 +48,7 @@ func (p *Replica) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if err := hijack(p.tlsConfig, p.primary, w, r); err != nil {
if err := hijack(p.tlsConfig, p.primary, w, r, p.primaryChanged); err != nil {
httpError(w, fmt.Sprintf("Unable to reach primary cluster manager (%s): %v", err, p.primary), http.StatusInternalServerError)
}
}
17 changes: 16 additions & 1 deletion api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func dialHijack(tlsConfig *tls.Config, addr string) (net.Conn, error) {
return tlsDialWithDialer(new(net.Dialer), "tcp", addr, tlsConfig)
}

func hijack(tlsConfig *tls.Config, addr string, w http.ResponseWriter, r *http.Request) error {
func hijack(tlsConfig *tls.Config, addr string, w http.ResponseWriter, r *http.Request, stopCh chan struct{}) error {
if parts := strings.SplitN(addr, "://", 2); len(parts) == 2 {
addr = parts[1]
}
Expand Down Expand Up @@ -297,6 +297,20 @@ func hijack(tlsConfig *tls.Config, addr string, w http.ResponseWriter, r *http.R
go cp(d, nc, inDone)
go cp(nc, d, outDone)

closeWatch := func(dst io.Writer, cancel chan struct{}) {
select {
case <-cancel:
if conn, ok := dst.(interface {
CloseWrite() error
}); ok {
conn.CloseWrite()
}
close(inDone)
close(outDone)
}
}
go closeWatch(d, stopCh)

// 1. When stdin is done, wait for stdout always
// 2. When stdout is done, close the stream and wait for stdin to finish
//
Expand All @@ -313,6 +327,7 @@ func hijack(tlsConfig *tls.Config, addr string, w http.ResponseWriter, r *http.R
nc.Close()
<-inDone
}
<-stopCh
return nil
}

Expand Down

0 comments on commit b0609bc

Please sign in to comment.