Skip to content

Commit

Permalink
etcdserver: add e2e test to reproduce the incorrect hash issue when r…
Browse files Browse the repository at this point in the history
…esuming scheduled compaction

Signed-off-by: caojiamingalan <[email protected]>
  • Loading branch information
CaojiamingAlan committed May 29, 2023
1 parent bd5d0a5 commit 22ecbc0
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
68 changes: 68 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,71 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 1 node...")
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithKeepDataDir(true),
e2e.WithCompactHashCheckEnabled(true),
e2e.WithCompactHashCheckTime(checkTime),
e2e.WithClusterSize(1),
)

t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// Add a member to the cluster, assign it a very slow compaction speed, so that its compaction can be interrupted.
//
newConfig := *epc.Cfg
newConfig.CompactionBatchLimit = 1
newConfig.CompactionSleepInterval = 1 * time.Hour
t.Log("adding a member whose compaction takes a non-trivial time...")
err = epc.StartNewProc(context.Background(), &newConfig, t)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

// Put 10 identical keys to the cluster, so that the compaction will drop some stale values.
t.Log("putting 10 values to the identical key...")
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
assert.NoError(t, err)
for i := 0; i < 10; i++ {
//use the same key
err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}

// HACK: manually change the --experimental-compaction-sleep-interval to a small value,
// so that the compaction can finish.
// todo: add disable method to robustness/failpoint interface
t.Log("compaction started...")
_, err = cc.Compact(ctx, 5, config.CompactOption{})
for i, arg := range epc.Procs[1].Config().Args {
if arg == "--experimental-compaction-sleep-interval" {
epc.Procs[1].Config().Args[i+1] = "1ms"
}
}

t.Log("restart proc 1 to interrupt its compaction...")
err = epc.Procs[1].Restart(ctx)

t.Log("sleeping 5 sec to let nodes do compact hash check...")
time.Sleep(5 * time.Second)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatalf("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}
8 changes: 8 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration
GoFailEnabled bool
CompactionBatchLimit int
CompactionSleepInterval time.Duration

WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
Expand Down Expand Up @@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}

func WithCompactionSleepInterval(time time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time }
}

func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
}
Expand Down Expand Up @@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}
if cfg.CompactionSleepInterval != 0 {
args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String())
}
if cfg.WarningUnaryRequestDuration != 0 {
args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String())
}
Expand Down

0 comments on commit 22ecbc0

Please sign in to comment.