Skip to content

Commit

Permalink
clientv3/integration: add more tests on balancer switch, inflight range
Browse files Browse the repository at this point in the history
Test all possible cases of server shutdown with inflight range requests.
Removed redundant tests in kv_test.go.

Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 21, 2017
1 parent 15bfc1b commit 96cd8ac
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 47 deletions.
47 changes: 0 additions & 47 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,53 +825,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
}
}

// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other
// endpoints are down, then it will reconnect.
func TestKVGetResetLoneEndpoint(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true})
defer clus.Terminate(t)

// get endpoint list
eps := make([]string, 2)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
}

cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// disconnect everything
clus.Members[0].Stop(t)
clus.Members[1].Stop(t)

// have Get try to reconnect
donec := make(chan struct{})
go func() {
// 3-second is the minimum interval between endpoint being marked
// as unhealthy and being removed from unhealthy, so possibly
// takes >5-second to unpin and repin an endpoint
// TODO: decrease timeout when balancer switch rewrite
ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second)
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}
cancel()
close(donec)
}()
time.Sleep(500 * time.Millisecond)
clus.Members[0].Restart(t)
select {
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for Get")
case <-donec:
}
}

// TestKVPutAtMostOnce ensures that a Put will only occur at most once
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {
Expand Down
114 changes: 114 additions & 0 deletions clientv3/integration/server_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,117 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
t.Errorf("failed to finish range request in time %v (timeout %v)", err, timeout)
}
}

func TestBalancerUnderServerStopInflightLinearizableGetOnRestart(t *testing.T) {
tt := []pinTestOpt{
{pinLeader: true, stopPinFirst: true},
{pinLeader: true, stopPinFirst: false},
{pinLeader: false, stopPinFirst: true},
{pinLeader: false, stopPinFirst: false},
}
for i := range tt {
testBalancerUnderServerStopInflightRangeOnRestart(t, true, tt[i])
}
}

func TestBalancerUnderServerStopInflightSerializableGetOnRestart(t *testing.T) {
tt := []pinTestOpt{
{pinLeader: true, stopPinFirst: true},
{pinLeader: true, stopPinFirst: false},
{pinLeader: false, stopPinFirst: true},
{pinLeader: false, stopPinFirst: false},
}
for i := range tt {
testBalancerUnderServerStopInflightRangeOnRestart(t, false, tt[i])
}
}

type pinTestOpt struct {
pinLeader bool
stopPinFirst bool
}

// testBalancerUnderServerStopInflightRangeOnRestart expects
// inflight range request reconnects on server restart.
func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable bool, opt pinTestOpt) {
defer testutil.AfterTest(t)

cfg := &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
}
if linearizable {
cfg.Size = 3
}

clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
if linearizable {
eps = append(eps, clus.Members[2].GRPCAddr())
}

lead := clus.WaitLeader(t)

target := lead
if !opt.pinLeader {
target = (target + 1) % 2
}

// pin eps[target]
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
if err != nil {
t.Errorf("failed to create client: %v", err)
}
defer cli.Close()

// wait for eps[target] to be pinned
mustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

if opt.stopPinFirst {
clus.Members[target].Stop(t)
// give some time for balancer switch before stopping the other
time.Sleep(time.Second)
clus.Members[(target+1)%2].Stop(t)
} else {
clus.Members[(target+1)%2].Stop(t)
// balancer cannot pin other member since it's already stopped
clus.Members[target].Stop(t)
}

// 3-second is the minimum interval between endpoint being marked
// as unhealthy and being removed from unhealthy, so possibly
// takes >5-second to unpin and repin an endpoint
// TODO: decrease timeout when balancer switch rewrite
clientTimeout := 7 * time.Second

var gops []clientv3.OpOption
if !linearizable {
gops = append(gops, clientv3.WithSerializable())
}

donec, readyc := make(chan struct{}), make(chan struct{}, 1)
go func() {
defer close(donec)
ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout)
readyc <- struct{}{}
_, err := cli.Get(ctx, "abc", gops...)
cancel()
if err != nil {
t.Fatal(err)
}
}()

<-readyc
clus.Members[target].Restart(t)

select {
case <-time.After(clientTimeout + 3*time.Second):
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
case <-donec:
}
}

0 comments on commit 96cd8ac

Please sign in to comment.