From 897a3460f2c3af4d40740beee6e643abdb24a605 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 21 Jul 2017 10:53:07 -0700 Subject: [PATCH] integration: add TestV3CompactInflightHashKV in v3_grpc_inflight_test.go --- integration/v3_grpc_inflight_test.go | 125 +++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/integration/v3_grpc_inflight_test.go b/integration/v3_grpc_inflight_test.go index 1994af06dd9e..d26fce241c68 100644 --- a/integration/v3_grpc_inflight_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -15,6 +15,7 @@ package integration import ( + "strconv" "sync" "testing" "time" @@ -54,6 +55,130 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { <-donec } +// TestV3CompactInflightHashKV ensures that HashKV returns +// correct hash while compacting. +func TestV3CompactInflightHashKV(t *testing.T) { + defer testutil.AfterTest(t) + clusterSize := 3 + clus := NewClusterV3(t, &ClusterConfig{Size: clusterSize}) + defer clus.Terminate(t) + kvcs := make([]pb.KVClient, 0) + mvcs := make([]pb.MaintenanceClient, 0) + + for i := 0; i < clusterSize; i++ { + kvcs = append(kvcs, toGRPC(clus.Client(i)).KV) + mvcs = append(mvcs, toGRPC(clus.Client(i)).Maintenance) + } + cctx, cancel := context.WithCancel(context.Background()) + var rev int64 + for i := 0; i < 100; i++ { + resp, err := kvcs[0].Put(cctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar" + strconv.Itoa(i))}) + if err != nil && cctx.Err() == nil { + t.Fatal(err) + } + rev = resp.Header.Revision + } + + // ensure appliedIdx is sync with committedIdx for each node. + for _, kvc := range kvcs { + _, err := kvc.Range(cctx, &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + } + + hashesc := make(chan *pb.HashKVResponse, 3) + donec := make(chan struct{}) + var wg sync.WaitGroup + for i, _ := range mvcs { + wg.Add(1) + go func(cliIdx int) { + defer wg.Done() + for { + resp, err := mvcs[cliIdx].HashKV(cctx, &pb.HashKVRequest{rev}) + if err != nil && cctx.Err() == nil { + t.Fatal(err) + } + + select { + case hashesc <- resp: + case <-donec: + return + case <-cctx.Done(): + return + } + } + }(i) + } + + compactRevs := make([]int64, 0) + for i := 10; i >= 0; i-- { + compactRevs = append(compactRevs, rev-1-int64(i)) + } + wg.Add(1) + go func() { + defer wg.Done() + for _, compactRev := range compactRevs { + if _, err := kvcs[0].Compact(cctx, &pb.CompactionRequest{Revision: compactRev}); err != nil && cctx.Err() == nil { + t.Fatal(err) + } + } + }() + + go func() { + defer close(donec) + revHashCounters := make(map[int64]int) + revHash := make(map[int64]uint32) + f := func() bool { + // HashKV before compaction returns -1 for compaction rev. + _, ok := revHash[-1] + if ok && len(revHashCounters) != len(compactRevs)+1 { + return false + } + if !ok && len(revHashCounters) != len(compactRevs) { + return false + } + + for _, v := range revHashCounters { + if v < 2 { + return false + } + } + for _, compactRev := range compactRevs { + if _, ok := revHash[compactRev]; !ok { + t.Fatalf("revHash doesn't contains compact rev %v", compactRev) + } + } + return true + } + + for !f() { + select { + case resp := <-hashesc: + revHashCounters[resp.CompactRevision]++ + if revHash[resp.CompactRevision] == 0 { + revHash[resp.CompactRevision] = resp.Hash + break + } + if resp.Hash != revHash[resp.CompactRevision] { + t.Fatalf("Hashes differ (current %v) != (saved %v)", resp.Hash, revHash[resp.CompactRevision]) + } + case <-cctx.Done(): + return + } + } + }() + + select { + case <-donec: + cancel() + case <-time.After(10 * time.Second): + cancel() + <-donec + } + wg.Wait() +} + // TestV3KVInflightRangeRequests ensures that inflight requests // (sent before server shutdown) are gracefully handled by server-side. // They are either finished or canceled, but never crash the backend.