Skip to content

Commit

Permalink
net/http2: perform connection health check
Browse files Browse the repository at this point in the history
After the connection has been idle for a while, periodic pings are sent
over the connection to check its health. Unhealthy connection is closed
and removed from the connection pool.

Fixes golang/go#31643
  • Loading branch information
Chao Xu committed Feb 22, 2020
1 parent aa69164 commit c5e6610
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 5 deletions.
66 changes: 61 additions & 5 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ type Transport struct {
// waiting for their turn.
StrictMaxConcurrentStreams bool

// PingTimeout is the timeout after which the connection will be closed
// if a response to Ping is not received.
// Defaults to 15s.
PingTimeout time.Duration

// ReadIdleTimeout is the timeout after which a health check using ping
// frame will be carried out if no frame is received on the connection.
// Note that a ping response will is considered a received frame, so if
// there is no other traffic on the connection, the health check will
// be performed every ReadIdleTimeout interval.
// If zero, no health check is performed.
ReadIdleTimeout time.Duration

// t1, if non-nil, is the standard library Transport using
// this transport. Its settings are used (but not its
// RoundTrip method, etc).
Expand All @@ -131,6 +144,14 @@ func (t *Transport) disableCompression() bool {
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
}

func (t *Transport) pingTimeout() time.Duration {
if t.PingTimeout == 0 {
return 15 * time.Second
}
return t.PingTimeout

}

// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
// It returns an error if t1 has already been HTTP/2-enabled.
func ConfigureTransport(t1 *http.Transport) error {
Expand Down Expand Up @@ -674,6 +695,20 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
return cc, nil
}

func (cc *ClientConn) healthCheck() {
pingTimeout := cc.t.pingTimeout()
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
defer cancel()
err := cc.Ping(ctx)
if err != nil {
cc.closeForLostPing()
cc.t.connPool().MarkDead(cc)
return
}
}

func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
cc.mu.Lock()
defer cc.mu.Unlock()
Expand Down Expand Up @@ -834,14 +869,12 @@ func (cc *ClientConn) sendGoAway() error {
return nil
}

// Close closes the client connection immediately.
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
// closes the client connection immediately. In-flight requests are interrupted.
// err is sent to streams.
func (cc *ClientConn) closeForError(err error) error {
cc.mu.Lock()
defer cc.cond.Broadcast()
defer cc.mu.Unlock()
err := errors.New("http2: client connection force closed via ClientConn.Close")
for id, cs := range cc.streams {
select {
case cs.resc <- resAndError{err: err}:
Expand All @@ -854,6 +887,20 @@ func (cc *ClientConn) Close() error {
return cc.tconn.Close()
}

// Close closes the client connection immediately.
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
err := errors.New("http2: client connection force closed via ClientConn.Close")
return cc.closeForError(err)
}

// closes the client connection immediately. In-flight requests are interrupted.
func (cc *ClientConn) closeForLostPing() error {
err := errors.New("http2: client connection force closed because ping frame was not acknowledged")
return cc.closeForError(err)
}

const maxAllocFrameSize = 512 << 10

// frameBuffer returns a scratch buffer suitable for writing DATA frames.
Expand Down Expand Up @@ -1706,8 +1753,17 @@ func (rl *clientConnReadLoop) run() error {
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
readIdleTimeout := cc.t.ReadIdleTimeout
var t *time.Timer
if readIdleTimeout != 0 {
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
defer t.Stop()
}
for {
f, err := cc.fr.ReadFrame()
if t != nil {
t.Reset(readIdleTimeout)
}
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
Expand Down
150 changes: 150 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,6 +3244,156 @@ func TestTransportNoRaceOnRequestObjectAfterRequestComplete(t *testing.T) {
req.Header = http.Header{}
}

func TestTransportCloseAfterLostPing(t *testing.T) {
clientDone := make(chan struct{})
ct := newClientTester(t)
ct.tr.PingTimeout = 1 * time.Second
ct.tr.ReadIdleTimeout = 1 * time.Second
ct.client = func() error {
defer ct.cc.(*net.TCPConn).CloseWrite()
defer close(clientDone)
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
_, err := ct.tr.RoundTrip(req)
if err == nil || !strings.Contains(err.Error(), "ping frame was not acknowledged") {
return fmt.Errorf("expected to get error about \"ping frame was not acknowledged\", got %v", err)
}
return nil
}
ct.server = func() error {
ct.greet()
<-clientDone
return nil
}
ct.run()
}

func TestTransportPingWhenReading(t *testing.T) {
testTransportPingWhenReading(t, 50*time.Millisecond, 110*time.Millisecond, 20)
testTransportPingWhenReading(t, 100*time.Millisecond, 50*time.Millisecond, 0)
// 0 means no ping/pong health check.
testTransportPingWhenReading(t, 0, 50*time.Millisecond, 0)
}

func testTransportPingWhenReading(t *testing.T, readIdleTimeout, serverResponseInterval time.Duration, expectedPingCount int) {
var pingCount int
clientBodyBytes := []byte("hello, this is client")
clientDone := make(chan struct{})
ct := newClientTester(t)
ct.tr.PingTimeout = 10 * time.Millisecond
ct.tr.ReadIdleTimeout = readIdleTimeout
// guards the ct.fr.Write
var wmu sync.Mutex
ct.client = func() error {
defer ct.cc.(*net.TCPConn).CloseWrite()
defer close(clientDone)

req, err := http.NewRequest("PUT", "https://dummy.tld/", bytes.NewReader(clientBodyBytes))
if err != nil {
return err
}
res, err := ct.tr.RoundTrip(req)
if err != nil {
return fmt.Errorf("RoundTrip: %v", err)
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("status code = %v; want %v", res.StatusCode, 200)
}
_, err = ioutil.ReadAll(res.Body)
return err
}
ct.server = func() error {
ct.greet()
var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
var dataRecv int
var closed bool
for {
f, err := ct.fr.ReadFrame()
if err != nil {
select {
case <-clientDone:
// If the client's done, it
// will have reported any
// errors on its side.
return nil
default:
return err
}
}
switch f := f.(type) {
case *WindowUpdateFrame, *SettingsFrame, *HeadersFrame:
case *DataFrame:
dataLen := len(f.Data())
if dataLen > 0 {
err := func() error {
wmu.Lock()
defer wmu.Unlock()
if dataRecv == 0 {
enc.WriteField(hpack.HeaderField{Name: ":status", Value: strconv.Itoa(200)})
ct.fr.WriteHeaders(HeadersFrameParam{
StreamID: f.StreamID,
EndHeaders: true,
EndStream: false,
BlockFragment: buf.Bytes(),
})
}
if err := ct.fr.WriteWindowUpdate(0, uint32(dataLen)); err != nil {
return err
}
if err := ct.fr.WriteWindowUpdate(f.StreamID, uint32(dataLen)); err != nil {
return err
}
return nil
}()
if err != nil {
return err
}
}
dataRecv += dataLen

if !closed && dataRecv == len(clientBodyBytes) {
closed = true
go func() {
for i := 0; i < 10; i++ {
wmu.Lock()
if err := ct.fr.WriteData(f.StreamID, false, []byte(fmt.Sprintf("hello, this is server data frame %d", i))); err != nil {
wmu.Unlock()
t.Error(err)
return
}
wmu.Unlock()
time.Sleep(serverResponseInterval)
}
wmu.Lock()
if err := ct.fr.WriteData(f.StreamID, true, []byte("hello, this is last server frame")); err != nil {
wmu.Unlock()
t.Error(err)
return
}
wmu.Unlock()
}()
}
case *PingFrame:
pingCount++
wmu.Lock()
if err := ct.fr.WritePing(true, f.Data); err != nil {
wmu.Unlock()
return err
}
wmu.Unlock()
default:
return fmt.Errorf("Unexpected client frame %v", f)
}
}
}
ct.run()
if e, a := expectedPingCount, pingCount; e != a {
t.Errorf("expected receiving %d pings, got %d pings", e, a)

}
}

func TestTransportRetryAfterGOAWAY(t *testing.T) {
var dialer struct {
sync.Mutex
Expand Down

0 comments on commit c5e6610

Please sign in to comment.