Skip to content

Commit

Permalink
http2: connection pool periodically sends ping frame and closes the
Browse files Browse the repository at this point in the history
connection if the ping is not responded on time.

DO NOT SUBMIT

Updates golang/go#31643
  • Loading branch information
Chao Xu committed Sep 30, 2019
1 parent aa69164 commit bc0d6c6
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
53 changes: 53 additions & 0 deletions http2/client_conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
package http2

import (
"context"
"crypto/tls"
"net/http"
"sync"
"time"
)

// ClientConnPool manages a pool of HTTP/2 client connections.
Expand Down Expand Up @@ -41,6 +43,16 @@ type clientConnPool struct {
dialing map[string]*dialCall // currently in-flight dials
keys map[*ClientConn][]string
addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls

// TODO: figure out a way to allow user to configure pingPeriod and
// pingTimeout.
pingPeriod time.Duration // how often pings are sent on idle
// connections. The connection will be closed if response is not
// received within pingTimeout. 0 means no periodic pings.
pingTimeout time.Duration // connection will be force closed if a Ping
// response is not received within pingTimeout.
pingStops map[*ClientConn]chan struct{} // channels to stop the
// periodic Pings.
}

func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
Expand Down Expand Up @@ -219,13 +231,54 @@ func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
if p.keys == nil {
p.keys = make(map[*ClientConn][]string)
}
if p.pingStops == nil {
p.pingStops = make(map[*ClientConn]chan struct{})
}
p.conns[key] = append(p.conns[key], cc)
p.keys[cc] = append(p.keys[cc], key)
if p.pingPeriod != 0 {
p.pingStops[cc] = p.pingConnection(key, cc)
}
}

// TODO: ping all connections at the same tick to save tickers?
func (p *clientConnPool) pingConnection(key string, cc *ClientConn) chan struct{} {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(p.pingPeriod)
defer ticker.Stop()
for {
select {
case <-done:
return
default:
}

select {
case <-done:
return
case <-ticker.C:
ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout)
err := cc.Ping(ctx)
if err != nil {
cc.closeForLostPing()
p.MarkDead(cc)
}
}
}
}()
return done
}

func (p *clientConnPool) MarkDead(cc *ClientConn) {
p.mu.Lock()
defer p.mu.Unlock()

if done, ok := p.pingStops[cc]; ok {
close(done)
delete(p.pingStops, cc)
}

for _, key := range p.keys[cc] {
vv, ok := p.conns[key]
if !ok {
Expand Down
26 changes: 21 additions & 5 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func ConfigureTransport(t1 *http.Transport) error {

func configureTransport(t1 *http.Transport) (*Transport, error) {
connPool := new(clientConnPool)
// TODO: figure out a way to allow user to configure pingPeriod and
// pingTimeout.
connPool.pingPeriod = 5 * time.Second
connPool.pingTimeout = 1 * time.Second
t2 := &Transport{
ConnPool: noDialClientConnPool{connPool},
t1: t1,
Expand Down Expand Up @@ -834,14 +838,12 @@ func (cc *ClientConn) sendGoAway() error {
return nil
}

// Close closes the client connection immediately.
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
// closes the client connection immediately. In-flight requests are interrupted.
// err is sent to streams.
func (cc *ClientConn) closeForError(err error) error {
cc.mu.Lock()
defer cc.cond.Broadcast()
defer cc.mu.Unlock()
err := errors.New("http2: client connection force closed via ClientConn.Close")
for id, cs := range cc.streams {
select {
case cs.resc <- resAndError{err: err}:
Expand All @@ -854,6 +856,20 @@ func (cc *ClientConn) Close() error {
return cc.tconn.Close()
}

// Close closes the client connection immediately.
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
err := errors.New("http2: client connection force closed via ClientConn.Close")
return cc.closeForError(err)
}

// closes the client connection immediately. In-flight requests are interrupted.
func (cc *ClientConn) closeForLostPing() error {
err := errors.New("http2: client connection force closed because ping frame is not responded")
return cc.closeForError(err)
}

const maxAllocFrameSize = 512 << 10

// frameBuffer returns a scratch buffer suitable for writing DATA frames.
Expand Down
26 changes: 26 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,6 +3244,32 @@ func TestTransportNoRaceOnRequestObjectAfterRequestComplete(t *testing.T) {
req.Header = http.Header{}
}

func TestTransportCloseAfterLostPing(t *testing.T) {
clientDone := make(chan struct{})
ct := newClientTester(t)
connPool := new(clientConnPool)
connPool.pingPeriod = 1 * time.Second
connPool.pingTimeout = 100 * time.Millisecond
connPool.t = ct.tr
ct.tr.ConnPool = connPool
ct.client = func() error {
defer ct.cc.(*net.TCPConn).CloseWrite()
defer close(clientDone)
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
_, err := ct.tr.RoundTrip(req)
if err == nil || !strings.Contains(err.Error(), "ping frame is not responded") {
return fmt.Errorf("expected to get error about \"ping frame is not responded\", got %v", err)
}
return nil
}
ct.server = func() error {
ct.greet()
<-clientDone
return nil
}
ct.run()
}

func TestTransportRetryAfterGOAWAY(t *testing.T) {
var dialer struct {
sync.Mutex
Expand Down

0 comments on commit bc0d6c6

Please sign in to comment.