Skip to content

Commit

Permalink
Merge pull request #16678 from ahrtr/linearizable_read_20231002
Browse files Browse the repository at this point in the history
test: add test cases to verify consistent reading right after writing
  • Loading branch information
ahrtr authored Oct 4, 2023
2 parents 1c5289d + b385121 commit 0f14106
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,95 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
}
t.Log("no corruption detected.")
}

func TestCtlV3SerializableRead(t *testing.T) {
testCtlV3ReadAfterWrite(t, clientv3.WithSerializable())
}

func TestCtlV3LinearizableRead(t *testing.T) {
testCtlV3ReadAfterWrite(t)
}

func testCtlV3ReadAfterWrite(t *testing.T, ops ...clientv3.OpOption) {
e2e.BeforeTest(t)

ctx := context.Background()

epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithClusterSize(1),
e2e.WithEnvVars(map[string]string{"GOFAIL_FAILPOINTS": `raftBeforeSave=sleep("200ms");beforeCommit=sleep("200ms")`}),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
}()

cc, err := clientv3.New(clientv3.Config{
Endpoints: epc.EndpointsGRPC(),
DialKeepAliveTime: 5 * time.Second,
DialKeepAliveTimeout: 1 * time.Second,
})
require.NoError(t, err)
defer func() {
derr := cc.Close()
require.NoError(t, derr)
}()

_, err = cc.Put(ctx, "foo", "bar")
require.NoError(t, err)

// Refer to https://github.com/etcd-io/etcd/pull/16658#discussion_r1341346778
t.Log("Restarting the etcd process to ensure all data is persisted")
err = epc.Procs[0].Restart(ctx)
require.NoError(t, err)
epc.WaitLeader(t)

_, err = cc.Put(ctx, "foo", "bar2")
require.NoError(t, err)

t.Log("Killing the etcd process right after successfully writing a new key/value")
err = epc.Procs[0].Kill()
require.NoError(t, err)
err = epc.Procs[0].Wait(ctx)
require.NoError(t, err)

stopc := make(chan struct{}, 1)
donec := make(chan struct{}, 1)

t.Log("Starting a goroutine to repeatedly read the key/value")
count := 0
go func() {
defer func() {
donec <- struct{}{}
}()
for {
select {
case <-stopc:
return
default:
}

rctx, cancel := context.WithTimeout(ctx, 2*time.Second)
resp, rerr := cc.Get(rctx, "foo", ops...)
cancel()
if rerr != nil {
continue
}

count++
require.Equal(t, "bar2", string(resp.Kvs[0].Value))
}
}()

t.Log("Starting the etcd process again")
err = epc.Procs[0].Start(ctx)
require.NoError(t, err)

time.Sleep(3 * time.Second)
stopc <- struct{}{}

<-donec
assert.Greater(t, count, 0)
t.Logf("Checked the key/value %d times", count)
}

0 comments on commit 0f14106

Please sign in to comment.