diff --git a/.github/workflows/linearizability.yaml b/.github/workflows/linearizability.yaml new file mode 100644 index 000000000000..09965953f286 --- /dev/null +++ b/.github/workflows/linearizability.yaml @@ -0,0 +1,11 @@ +name: Linearizability +on: [push, pull_request] +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: "1.19.1" + - run: make test-linearizability diff --git a/Makefile b/Makefile index fb05e5ca5053..556d0cb98fb3 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,10 @@ test-e2e: build test-e2e-release: build PASSES="release e2e" ./scripts/test.sh +.PHONY: test-linearizability +test-linearizability: build + PASSES="linearizability" ./scripts/test.sh + # Static analysis verify: verify-gofmt verify-bom verify-lint verify-dep verify-shellcheck verify-goword verify-govet verify-license-header verify-receiver-name verify-mod-tidy verify-shellcheck verify-shellws verify-proto-annotations diff --git a/bill-of-materials.json b/bill-of-materials.json index fae8496354b6..9c865b55c308 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -8,6 +8,15 @@ } ] }, + { + "project": "github.com/anishathalye/porcupine", + "licenses": [ + { + "type": "MIT License", + "confidence": 1 + } + ] + }, { "project": "github.com/benbjohnson/clock", "licenses": [ diff --git a/go.sum b/go.sum index 70338cde9b5f..f43b3494bd93 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/scripts/test.sh b/scripts/test.sh index 1be4c03578a5..eab6539fe5f4 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -124,6 +124,11 @@ function e2e_pass { run_for_module "tests" go_test "./common/..." "keep_going" : --tags=e2e -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@" } +function linearizability_pass { + # e2e tests are running pre-build binary. Settings like --race,-cover,-cpu does not have any impact. + run_for_module "tests" go_test "./linearizability/..." "keep_going" : -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@" +} + function integration_e2e_pass { run_pass "integration" "${@}" run_pass "e2e" "${@}" diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index 85e304636de3..8acfcac70a8f 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -110,6 +110,10 @@ func (c *e2eCluster) Client(cfg clientv3.AuthConfig) (Client, error) { return e2eClient{etcdctl}, nil } +func (c *e2eCluster) Endpoints() []string { + return c.EndpointsV3() +} + func (c *e2eCluster) Members() (ms []Member) { for _, proc := range c.EtcdProcessCluster.Procs { ms = append(ms, e2eMember{EtcdProcess: proc, Cfg: c.Cfg}) diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 1716198c632b..a4be00c62b33 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -103,6 +103,10 @@ func (p *proxyEtcdProcess) Logs() LogsExpect { return p.etcdProc.Logs() } +func (p *proxyEtcdProcess) Kill() error { + return p.etcdProc.Kill() +} + type proxyProc struct { lg *zap.Logger name string diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 345d2a5a2762..8e9270ed67ab 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "os" + "syscall" "testing" "time" @@ -47,6 +48,7 @@ type EtcdProcess interface { Close() error Config() *EtcdServerProcessConfig Logs() LogsExpect + Kill() error } type LogsExpect interface { @@ -177,6 +179,10 @@ func (ep *EtcdServerProcess) Logs() LogsExpect { return ep.proc } +func (ep *EtcdServerProcess) Kill() error { + return ep.proc.Signal(syscall.SIGKILL) +} + func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { t.Helper() var err error diff --git a/tests/framework/interface.go b/tests/framework/interface.go index 696879be9f29..3f49b3fd7648 100644 --- a/tests/framework/interface.go +++ b/tests/framework/interface.go @@ -33,6 +33,7 @@ type Cluster interface { Client(cfg clientv3.AuthConfig) (Client, error) WaitLeader(t testing.TB) int Close() error + Endpoints() []string } type Member interface { diff --git a/tests/go.mod b/tests/go.mod index 572e232e81f1..dad4d071c3b1 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -15,6 +15,7 @@ replace ( ) require ( + github.com/anishathalye/porcupine v0.1.2 github.com/coreos/go-semver v0.3.0 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 diff --git a/tests/go.sum b/tests/go.sum index 7c4432c9c848..1d758e57df66 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -43,6 +43,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2 h1:eqWNeLcnTzXt6usipDJ4RFn6XOWqY5wEqBYVG3yFLSE= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/tests/linearizability/main_test.go b/tests/linearizability/main_test.go new file mode 100644 index 000000000000..61f84b5322cc --- /dev/null +++ b/tests/linearizability/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "testing" + + "go.etcd.io/etcd/tests/v3/framework" +) + +var testRunner = framework.E2eTestRunner + +func TestMain(m *testing.M) { + testRunner.TestMain(m) +} diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go new file mode 100644 index 000000000000..4d817d5527b6 --- /dev/null +++ b/tests/linearizability/model.go @@ -0,0 +1,112 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "encoding/json" + "fmt" + + "github.com/anishathalye/porcupine" +) + +type Operation int8 + +const Read Operation = 0 +const Put Operation = 1 + +type etcdRequest struct { + op Operation + writeData string +} + +type etcdResponse struct { + readData string + err error +} + +type EtcdState struct { + Value string + FailedWrites []string +} + +var etcdModel = porcupine.Model{ + Init: func() interface{} { return "{}" }, + Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { + stateString := st.(string) + var state EtcdState + err := json.Unmarshal([]byte(stateString), &state) + if err != nil { + panic(err) + } + request := in.(etcdRequest) + response := out.(etcdResponse) + ok, state := step(state, request, response) + data, err := json.Marshal(state) + if err != nil { + panic(err) + } + return ok, string(data) + }, + DescribeOperation: func(in, out interface{}) string { + request := in.(etcdRequest) + response := out.(etcdResponse) + var call, args, resp string + switch request.op { + case Read: + call = "read" + if response.err != nil { + resp = response.err.Error() + } else { + resp = response.readData + } + case Put: + call = "write" + args = request.writeData + if response.err != nil { + resp = response.err.Error() + } else { + resp = "ok" + } + default: + return "" + } + return fmt.Sprintf("%s(%q) -> %s", call, args, resp) + }, +} + +func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { + var ok bool + switch request.op { + case Read: + ok = state.Value == response.readData + if !ok { + for i, write := range state.FailedWrites { + if write == response.readData { + ok = true + state = EtcdState{Value: write, FailedWrites: append(state.FailedWrites[:i], state.FailedWrites[i+1:]...)} + break + } + } + } + case Put: + if response.err == nil { + state.Value = request.writeData + } else { + state.FailedWrites = append(state.FailedWrites, request.writeData) + } + ok = true + } + return ok, state +} diff --git a/tests/linearizability/putget_test.go b/tests/linearizability/putget_test.go new file mode 100644 index 000000000000..35950347559f --- /dev/null +++ b/tests/linearizability/putget_test.go @@ -0,0 +1,193 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "context" + "fmt" + "net/http" + "sync" + "testing" + "time" + + "github.com/anishathalye/porcupine" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/framework/testutils" + "go.uber.org/zap" +) + +const maxOperationsPerClient = 1000000 + +var httpClient = http.Client{ + Timeout: 10 * time.Millisecond, +} +var waitBetweenTriggers = time.Second + +func TestPutGetLinearizability(t *testing.T) { + testRunner.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) + defer cancel() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{ + InitialToken: "new", + ClusterSize: 1, + Restart: true, + }) + if err != nil { + t.Fatal(err) + } + defer clus.Close() + + // Increasing number of failpoints number of tries making test more accurate but requiring more time. + failpointsCount := 60 + minimalQPS := 100.0 + allFailpointsInjected := make(chan struct{}) + go triggerFailpoints(ctx, t, clus, failpointsCount, allFailpointsInjected) + start := time.Now() + operations := simulateTraffic(ctx, t, clus, 1, 8, allFailpointsInjected) + end := time.Now() + + t.Logf("Recorded %d operations", len(operations)) + qps := float64(len(operations)) / float64(end.Sub(start)) * float64(time.Second) + t.Logf("Average traffic: %f qps", qps) + if qps < minimalQPS { + t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", minimalQPS, qps) + } + linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0) + if linearizable != porcupine.Ok { + t.Error("Model is not linearizable, saving visualization to /tmp/results.html") + err := porcupine.VisualizePath(etcdModel, info, "/tmp/results.html") + if err != nil { + t.Errorf("Failed to visualize, err: %v", err) + } + } +} + +func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpointsCount int, finished chan<- struct{}) { + triggers := 0 + + time.Sleep(waitBetweenTriggers) + ctx, cancel := context.WithTimeout(ctx, 2*time.Duration(failpointsCount)*waitBetweenTriggers) + defer cancel() + testutils.ExecuteUntil(ctx, t, func() { + var err error + for { + err = clus.Procs[0].Kill() + if err != nil { + t.Log(err) + continue + } + triggers++ + if triggers >= failpointsCount { + break + } + time.Sleep(waitBetweenTriggers) + } + }) + time.Sleep(waitBetweenTriggers) + close(finished) +} + +func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, memberCount int, clientCount int, finished <-chan struct{}) (operations []porcupine.Operation) { + startTime := time.Now() + mux := sync.Mutex{} + endpoints := clus.EndpointsV3() + if len(endpoints) != memberCount { + t.Fatalf("Unexpected number of endpoints, got %d, expected %d", len(endpoints), memberCount) + } + + testutils.ExecuteUntil(ctx, t, func() { + wg := sync.WaitGroup{} + for i := 0; i < clientCount; i++ { + cc, err := clientv3.New(clientv3.Config{ + Endpoints: []string{endpoints[i%memberCount]}, + Logger: zap.NewNop(), + DialTimeout: 5 * time.Millisecond, + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + defer cc.Close() + wg.Add(1) + go func(clientId int) { + defer wg.Done() + op, err := putGet(cc, clientId, startTime, finished) + if err != nil { + t.Error(err) + return + } + mux.Lock() + operations = append(operations, op...) + mux.Unlock() + }(i) + } + + wg.Wait() + }) + return operations +} + +func putGet(cc *clientv3.Client, clientId int, startTime time.Time, finished <-chan struct{}) (operations []porcupine.Operation, err error) { + id := maxOperationsPerClient * clientId + ctx := context.Background() + key := "key" + + for i := 0; i < maxOperationsPerClient; { + select { + case <-finished: + return operations, nil + default: + } + getStartTime := time.Now() + getCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + resp, err := cc.Get(getCtx, key) + cancel() + getResponseTime := time.Now() + if err != nil { + continue + } + var readData string + if len(resp.Kvs) == 1 { + readData = string(resp.Kvs[0].Value) + } + operations = append(operations, porcupine.Operation{ + ClientId: clientId, + Input: etcdRequest{op: Read}, + Call: getStartTime.Sub(startTime).Nanoseconds(), + Output: etcdResponse{readData: readData}, + Return: getResponseTime.Sub(startTime).Nanoseconds(), + }) + putData := fmt.Sprintf("%d", id+i) + putStartTime := time.Now() + putCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + _, err = cc.Put(putCtx, key, putData) + cancel() + putResponseTime := time.Now() + operations = append(operations, porcupine.Operation{ + ClientId: clientId, + Input: etcdRequest{op: Put, writeData: putData}, + Call: putStartTime.Sub(startTime).Nanoseconds(), + Output: etcdResponse{err: err}, + Return: putResponseTime.Sub(startTime).Nanoseconds(), + }) + if err != nil { + continue + } + i++ + } + return operations, nil +}