Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keepalive: apply minimum ping time of 10s to client and 1s to server #2642

Merged
merged 2 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,25 +939,61 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
t.Fatalf("Failed to listen. Err: %v", err)
}
defer lis.Close()
connected := make(chan struct{})
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("error accepting connection: %v", err)
return
}
defer conn.Close()
f := http2.NewFramer(conn, conn)
// Start a goroutine to read from the conn to prevent the client from
// blocking after it writes its preface.
go func() {
for {
if _, err := f.ReadFrame(); err != nil {
return
}
}
}()
if err := f.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("error writing settings: %v", err)
return
}
<-connected
if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil {
t.Errorf("error writing GOAWAY: %v", err)
return
}
}()
addr := lis.Addr().String()
s := NewServer()
go s.Serve(lis)
defer s.Stop()
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
Time: 50 * time.Millisecond,
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 100 * time.Millisecond,
PermitWithoutStream: true,
}))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
time.Sleep(1 * time.Second)
cc.mu.RLock()
defer cc.mu.RUnlock()
v := cc.mkp.Time
if v < 100*time.Millisecond {
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
close(connected)
for {
time.Sleep(10 * time.Millisecond)
cc.mu.RLock()
v := cc.mkp.Time
if v == 20*time.Second {
// Success
cc.mu.RUnlock()
return
}
if ctx.Err() != nil {
// Timeout
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
}
cc.mu.RUnlock()
}
}

Expand Down
5 changes: 5 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -388,6 +389,10 @@ func WithUserAgent(s string) DialOption {
// WithKeepaliveParams returns a DialOption that specifies keepalive parameters
// for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
if kp.Time < internal.KeepaliveMinPingTime {
grpclog.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
kp.Time = internal.KeepaliveMinPingTime
}
return newFuncDialOption(func(o *dialOptions) {
o.copts.KeepaliveParams = kp
})
Expand Down
8 changes: 7 additions & 1 deletion internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// symbols to avoid circular dependencies.
package internal

import "context"
import (
"context"
"time"
)

var (
// WithContextDialer is exported by dialoptions.go
Expand All @@ -33,6 +36,9 @@ var (
HealthCheckFunc HealthChecker
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
2 changes: 2 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it
// pings the server to see if the transport is still alive.
// If set below 10s, a minimum value of 10s will be used instead.
Time time.Duration // The current default value is infinity.
// After having pinged for keepalive check, the client waits for a duration
// of Timeout and if no activity is seen even after that the connection is
Expand Down Expand Up @@ -62,6 +63,7 @@ type ServerParameters struct {
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it
// pings the client to see if the transport is still alive.
// If set below 1s, a minimum value of 1s will be used instead.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
Time time.Duration // The current default value is 2 hours.
// After having pinged for keepalive check, the server waits for a duration
// of Timeout and if no activity is seen even after that the connection is
Expand Down
5 changes: 5 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func InitialConnWindowSize(s int32) ServerOption {

// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
if kp.Time > 0 && kp.Time < time.Second {
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
kp.Time = time.Second
}

return func(o *options) {
o.keepaliveParams = kp
}
Expand Down
28 changes: 19 additions & 9 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -847,8 +848,8 @@ func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
if err != nil {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
}
// 2500ms allow for 2 keepalives (1000ms per round trip)
time.Sleep(2500 * time.Millisecond)
// Allow for at least 2 keepalives (1s per ping interval)
time.Sleep(4 * time.Second)
cancel()
}

Expand Down Expand Up @@ -1125,15 +1126,24 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {

func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
channelz.NewChannelzStorage()
defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
internal.KeepaliveMinPingTime = time.Second
e := tcpClearRREnv
te := newTest(t, e)
te.cliKeepAlive = &keepalive.ClientParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: time.Second,
Timeout: 500 * time.Millisecond,
PermitWithoutStream: true,
}))
te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: 500 * time.Millisecond,
PermitWithoutStream: true,
}))
te.startServer(&testServer{security: e.security})
te.clientConn() // Dial the server
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
doIdleCallToInvokeKeepAlive(tc, t)

if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
Expand All @@ -1157,7 +1167,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
break
}
skt := channelz.GetSocket(id)
if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
if skt.SocketData.KeepAlivesSent != 2 {
return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
}
return true, nil
Expand Down Expand Up @@ -1230,7 +1240,7 @@ func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
channelz.NewChannelzStorage()
e := tcpClearRREnv
te := newTest(t, e)
te.svrKeepAlive = &keepalive.ServerParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{Time: time.Second, Timeout: 500 * time.Millisecond}))
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
Expand Down
9 changes: 0 additions & 9 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -510,8 +509,6 @@ type test struct {
customDialOptions []grpc.DialOption
customServerOptions []grpc.ServerOption
resolverScheme string
cliKeepAlive *keepalive.ClientParameters
svrKeepAlive *keepalive.ServerParameters

// All test dialing is blocking by default. Set this to true if dial
// should be non-blocking.
Expand Down Expand Up @@ -633,9 +630,6 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
case "clientTimeoutCreds":
sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
}
if te.svrKeepAlive != nil {
sopts = append(sopts, grpc.KeepaliveParams(*te.svrKeepAlive))
}
sopts = append(sopts, te.customServerOptions...)
s := grpc.NewServer(sopts...)
te.srv = s
Expand Down Expand Up @@ -873,9 +867,6 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
if te.srvAddr == "" {
te.srvAddr = "client.side.only.test"
}
if te.cliKeepAlive != nil {
opts = append(opts, grpc.WithKeepaliveParams(*te.cliKeepAlive))
}
opts = append(opts, te.customDialOptions...)
return opts, scheme
}
Expand Down