diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index e9f7a66c413e..241dd09ffb42 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -193,3 +193,71 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { + checkTime := time.Second + e2e.BeforeTest(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start a new cluster, with compact hash check enabled. + t.Log("creating a new cluster with 1 node...") + epc, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithKeepDataDir(true), + e2e.WithCompactHashCheckEnabled(true), + e2e.WithCompactHashCheckTime(checkTime), + e2e.WithClusterSize(1), + ) + + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + // Add a member to the cluster, assign it a very slow compaction speed, so that its compaction can be interrupted. + // + newConfig := *epc.Cfg + newConfig.CompactionBatchLimit = 1 + newConfig.CompactionSleepInterval = 1 * time.Hour + t.Log("adding a member whose compaction takes a non-trivial time...") + err = epc.StartNewProc(context.Background(), &newConfig, t) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + + // Put 10 identical keys to the cluster, so that the compaction will drop some stale values. + t.Log("putting 10 values to the identical key...") + cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC()) + assert.NoError(t, err) + for i := 0; i < 10; i++ { + //use the same key + err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{}) + assert.NoError(t, err, "error on put") + } + + // HACK: manually change the --experimental-compaction-sleep-interval to a small value, + // so that the compaction can finish. + // todo: add disable method to robustness/failpoint interface + t.Log("compaction started...") + _, err = cc.Compact(ctx, 5, config.CompactOption{}) + for i, arg := range epc.Procs[1].Config().Args { + if arg == "--experimental-compaction-sleep-interval" { + epc.Procs[1].Config().Args[i+1] = "1ms" + } + } + + t.Log("restart proc 1 to interrupt its compaction...") + err = epc.Procs[1].Restart(ctx) + + t.Log("sleeping 5 sec to let nodes do compact hash check...") + time.Sleep(5 * time.Second) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatalf("there should be no corruption after resuming the compaction, but corruption detected") + } + } + t.Log("no corruption detected.") +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 0eafc4579104..913dd31fab45 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct { CompactHashCheckTime time.Duration GoFailEnabled bool CompactionBatchLimit int + CompactionSleepInterval time.Duration WarningUnaryRequestDuration time.Duration ExperimentalWarningUnaryRequestDuration time.Duration @@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit } } +func WithCompactionSleepInterval(time time.Duration) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time } +} + func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval } } @@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.CompactionBatchLimit != 0 { args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) } + if cfg.CompactionSleepInterval != 0 { + args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String()) + } if cfg.WarningUnaryRequestDuration != 0 { args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String()) }