From a37dd0055f7f5fcac62148c402c897ddbe875cda Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 30 Oct 2017 17:16:01 -0700 Subject: [PATCH] clientv3/integration: move to TestBalancerUnderBlackholeKeepAliveWatch Signed-off-by: Gyu-Ho Lee --- clientv3/integration/black_hole_test.go | 76 ++++++++++++++ clientv3/integration/watch_keepalive_test.go | 103 ------------------- 2 files changed, 76 insertions(+), 103 deletions(-) delete mode 100644 clientv3/integration/watch_keepalive_test.go diff --git a/clientv3/integration/black_hole_test.go b/clientv3/integration/black_hole_test.go index 1960c909c88..f42d77c58ef 100644 --- a/clientv3/integration/black_hole_test.go +++ b/clientv3/integration/black_hole_test.go @@ -26,6 +26,82 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) +// TestBalancerUnderBlackholeKeepAliveWatch tests when watch discovers it cannot talk to +// blackholed endpoint, client balancer switches to healthy one. +// TODO: test server-to-client keepalive ping +func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 2, + GRPCKeepAliveMinTime: 1 * time.Millisecond, // avoid too_many_pings + }) + defer clus.Terminate(t) + + eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + + ccfg := clientv3.Config{ + Endpoints: []string{eps[0]}, + DialTimeout: 1 * time.Second, + DialKeepAliveTime: 1 * time.Second, + DialKeepAliveTimeout: 500 * time.Millisecond, + } + + // gRPC internal implementation related. + pingInterval := ccfg.DialKeepAliveTime + ccfg.DialKeepAliveTimeout + // 3s for slow machine to process watch and reset connections + // TODO: only send healthy endpoint to gRPC so gRPC wont waste time to + // dial for unhealthy endpoint. + // then we can reduce 3s to 1s. + timeout := pingInterval + 3*time.Second + + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify()) + if _, ok := <-wch; !ok { + t.Fatalf("watch failed on creation") + } + + // endpoint can switch to eps[1] when it detects the failure of eps[0] + cli.SetEndpoints(eps...) + + clus.Members[0].Blackhole() + + if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil { + t.Fatal(err) + } + select { + case <-wch: + case <-time.After(timeout): + t.Error("took too long to receive watch events") + } + + clus.Members[0].Unblackhole() + + // waiting for moving eps[0] out of unhealthy, so that it can be re-pined. + time.Sleep(ccfg.DialTimeout) + + clus.Members[1].Blackhole() + + // make sure client[0] can connect to eps[0] after remove the blackhole. + if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "foo", "bar1"); err != nil { + t.Fatal(err) + } + + select { + case <-wch: + case <-time.After(timeout): + t.Error("took too long to receive watch events") + } +} + func TestBalancerUnderBlackholeNoKeepAlivePut(t *testing.T) { testBalancerUnderBlackholeNoKeepAliveMutable(t, func(cli *clientv3.Client, ctx context.Context) error { _, err := cli.Put(ctx, "foo", "bar") diff --git a/clientv3/integration/watch_keepalive_test.go b/clientv3/integration/watch_keepalive_test.go deleted file mode 100644 index 1de36aa3c1a..00000000000 --- a/clientv3/integration/watch_keepalive_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2017 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build !cluster_proxy - -package integration - -import ( - "context" - "testing" - "time" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/integration" - "github.com/coreos/etcd/pkg/testutil" -) - -// TestWatchKeepAlive tests when watch discovers it cannot talk to -// blackholed endpoint, client balancer switches to healthy one. -// TODO: test server-to-client keepalive ping -func TestWatchKeepAlive(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{ - Size: 2, - GRPCKeepAliveMinTime: 1 * time.Millisecond}, - ) // avoid too_many_pings - - defer clus.Terminate(t) - - ccfg := clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr()}, - DialTimeout: 1 * time.Second, - DialKeepAliveTime: 1 * time.Second, - DialKeepAliveTimeout: 500 * time.Millisecond, - } - - // gRPC internal implementation related. - pingInterval := ccfg.DialKeepAliveTime + ccfg.DialKeepAliveTimeout - // 3s for slow machine to process watch and reset connections - // TODO: only send healthy endpoint to gRPC so gRPC wont waste time to - // dial for unhealthy endpoint. - // then we can reduce 3s to 1s. - timeout := pingInterval + 3*time.Second - - cli, err := clientv3.New(ccfg) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - - wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify()) - if _, ok := <-wch; !ok { - t.Fatalf("watch failed on creation") - } - - // endpoint can switch to ep[1] when it detects the failure of ep0 - cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()) - - clus.Members[0].Blackhole() - - if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil { - t.Fatal(err) - } - select { - case <-wch: - case <-time.After(timeout): - t.Error("took too long to receive watch events") - } - - clus.Members[0].Unblackhole() - - // waiting for moving ep0 out of unhealthy, so that it can be re-pined. - time.Sleep(ccfg.DialTimeout) - - clus.Members[1].Blackhole() - - // make sure client0 can connect to member 0 after remove the blackhole. - if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil { - t.Fatal(err) - } - - if _, err = clus.Client(0).Put(context.TODO(), "foo", "bar1"); err != nil { - t.Fatal(err) - } - - select { - case <-wch: - case <-time.After(timeout): - t.Error("took too long to receive watch events") - } -}