diff --git a/etcdutl/go.mod b/etcdutl/go.mod index acbf036ab0a6..261b10134621 100644 --- a/etcdutl/go.mod +++ b/etcdutl/go.mod @@ -56,6 +56,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/etcd/client/v2 v2.306.0-alpha.0 // indirect + go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect diff --git a/etcdutl/go.sum b/etcdutl/go.sum index 92c0e0fa7029..4811d50b027f 100644 --- a/etcdutl/go.sum +++ b/etcdutl/go.sum @@ -261,6 +261,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/go.mod b/go.mod index 74956a2eb692..2e494c705edb 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/stretchr/testify v1.7.2 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect diff --git a/go.sum b/go.sum index 70338cde9b5f..8b0fac3bedf7 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -327,6 +328,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 3eb636aacbc8..763386f85927 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -36,9 +36,13 @@ const DEBUG_LINES_TAIL = 40 type ExpectProcess struct { cfg expectConfig - cmd *exec.Cmd - fpty *os.File - wg sync.WaitGroup + // StopSignal is the signal Stop sends to the process; defaults to SIGTERM. + StopSignal os.Signal + + cmd *exec.Cmd + fpty *os.File + closech chan struct{} + logsCollected sync.WaitGroup mu sync.Mutex // protects lines and err lines []string @@ -50,18 +54,20 @@ type ExpectProcess struct { // NewExpect creates a new process for expect testing. func NewExpect(name string, arg ...string) (ep *ExpectProcess, err error) { // if env[] is nil, use current system env and the default command as name - return NewExpectWithEnv(name, arg, nil, name) + return NewExpectWithEnv(name, arg, nil, name, false) } // NewExpectWithEnv creates a new process with user defined env variables for expect testing. -func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string) (ep *ExpectProcess, err error) { +func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string, restart bool) (ep *ExpectProcess, err error) { ep = &ExpectProcess{ cfg: expectConfig{ - name: serverProcessConfigName, - cmd: name, - args: args, - env: env, + name: serverProcessConfigName, + cmd: name, + args: args, + env: env, + restart: restart, }, + closech: make(chan struct{}), } ep.cmd = commandFromConfig(ep.cfg) @@ -69,16 +75,17 @@ func NewExpectWithEnv(name string, args []string, env []string, serverProcessCon return nil, err } - ep.wg.Add(1) + ep.logsCollected.Add(1) go ep.read() return ep, nil } type expectConfig struct { - name string - cmd string - args []string - env []string + name string + cmd string + args []string + env []string + restart bool } func commandFromConfig(config expectConfig) *exec.Cmd { @@ -94,23 +101,52 @@ func (ep *ExpectProcess) Pid() int { } func (ep *ExpectProcess) read() { - defer ep.wg.Done() + defer ep.logsCollected.Done() printDebugLines := os.Getenv("EXPECT_DEBUG") != "" - r := bufio.NewReader(ep.fpty) for { - l, err := r.ReadString('\n') ep.mu.Lock() - if l != "" { - if printDebugLines { - fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l) + cmd := ep.cmd + r := bufio.NewReader(ep.fpty) + ep.mu.Unlock() + if cmd == nil { + break + } + pid := cmd.Process.Pid + for { + l, err := r.ReadString('\n') + ep.mu.Lock() + if l != "" { + if printDebugLines { + fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, pid, l) + } + ep.lines = append(ep.lines, l) + ep.count++ } - ep.lines = append(ep.lines, l) - ep.count++ + if err != nil { + ep.err = err + ep.mu.Unlock() + break + } + ep.mu.Unlock() } - if err != nil { + select { + case <-ep.closech: + return + default: + } + ep.mu.Lock() + cmd = ep.cmd + ep.mu.Unlock() + if cmd != nil { + cmd.Wait() + } + ep.mu.Lock() + var err error + ep.cmd = commandFromConfig(ep.cfg) + if ep.fpty, err = pty.Start(ep.cmd); err != nil { + fmt.Printf("Error %s\n", err) ep.err = err - ep.mu.Unlock() - break + ep.cmd = nil } ep.mu.Unlock() } @@ -179,7 +215,10 @@ func (ep *ExpectProcess) Stop() error { return ep.close(true) } // Signal sends a signal to the expect process func (ep *ExpectProcess) Signal(sig os.Signal) error { - return ep.cmd.Process.Signal(sig) + ep.mu.Lock() + err := ep.cmd.Process.Signal(sig) + ep.mu.Unlock() + return err } // Close waits for the expect process to exit. @@ -188,16 +227,22 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error { func (ep *ExpectProcess) Close() error { return ep.close(false) } func (ep *ExpectProcess) close(kill bool) error { - if ep.cmd == nil { + ep.mu.Lock() + cmd := ep.cmd + ep.mu.Unlock() + + if cmd == nil { return ep.err } + close(ep.closech) + if kill { ep.Signal(syscall.SIGTERM) } - err := ep.cmd.Wait() + err := cmd.Wait() ep.fpty.Close() - ep.wg.Wait() + ep.logsCollected.Wait() if err != nil { if !kill && strings.Contains(err.Error(), "exit status") { @@ -207,8 +252,9 @@ func (ep *ExpectProcess) close(kill bool) error { err = nil } } - + ep.mu.Lock() ep.cmd = nil + ep.mu.Unlock() return err } diff --git a/server/etcdserver/raft.fail.go b/server/etcdserver/raft.fail.go new file mode 100644 index 000000000000..6f1a78bc3136 --- /dev/null +++ b/server/etcdserver/raft.fail.go @@ -0,0 +1,16 @@ +// GENERATED BY GOFAIL. DO NOT EDIT. + +package etcdserver + +import "go.etcd.io/gofail/runtime" + +var __fp_raftBeforeSaveWaitWalSync *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftBeforeSaveWaitWalSync", false) +var __fp_raftBeforeLeaderSend *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftBeforeLeaderSend", false) +var __fp_raftBeforeSaveSnap *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftBeforeSaveSnap", false) +var __fp_raftAfterSaveSnap *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftAfterSaveSnap", false) +var __fp_raftBeforeSave *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftBeforeSave", false) +var __fp_raftAfterSave *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftAfterSave", false) +var __fp_raftBeforeApplySnap *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftBeforeApplySnap", false) +var __fp_raftAfterApplySnap *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftAfterApplySnap", false) +var __fp_raftAfterWALRelease *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftAfterWALRelease", false) +var __fp_raftBeforeFollowerSend *runtime.Failpoint = runtime.NewFailpoint("etcdserver", "raftBeforeFollowerSend", false) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 5dea231b9df8..ecb4f67c03fd 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -211,7 +211,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { waitWALSync := shouldWaitWALSync(rd) if waitWALSync { - // gofail: var raftBeforeSaveWaitWalSync struct{} + if vraftBeforeSaveWaitWalSync, __fpErr := __fp_raftBeforeSaveWaitWalSync.Acquire(); __fpErr == nil { defer __fp_raftBeforeSaveWaitWalSync.Release(); _, __fpTypeOK := vraftBeforeSaveWaitWalSync.(struct{}); if !__fpTypeOK { goto __badTyperaftBeforeSaveWaitWalSync} ; __badTyperaftBeforeSaveWaitWalSync: __fp_raftBeforeSaveWaitWalSync.BadType(vraftBeforeSaveWaitWalSync, "struct{}"); }; if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } @@ -227,22 +227,22 @@ func (r *raftNode) start(rh *raftReadyHandler) { // writing to their disks. // For more details, check raft thesis 10.2.1 if islead { - // gofail: var raftBeforeLeaderSend struct{} + if vraftBeforeLeaderSend, __fpErr := __fp_raftBeforeLeaderSend.Acquire(); __fpErr == nil { defer __fp_raftBeforeLeaderSend.Release(); _, __fpTypeOK := vraftBeforeLeaderSend.(struct{}); if !__fpTypeOK { goto __badTyperaftBeforeLeaderSend} ; __badTyperaftBeforeLeaderSend: __fp_raftBeforeLeaderSend.BadType(vraftBeforeLeaderSend, "struct{}"); }; r.transport.Send(r.processMessages(rd.Messages)) } // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to // ensure that recovery after a snapshot restore is possible. if !raft.IsEmptySnap(rd.Snapshot) { - // gofail: var raftBeforeSaveSnap struct{} + if vraftBeforeSaveSnap, __fpErr := __fp_raftBeforeSaveSnap.Acquire(); __fpErr == nil { defer __fp_raftBeforeSaveSnap.Release(); _, __fpTypeOK := vraftBeforeSaveSnap.(struct{}); if !__fpTypeOK { goto __badTyperaftBeforeSaveSnap} ; __badTyperaftBeforeSaveSnap: __fp_raftBeforeSaveSnap.BadType(vraftBeforeSaveSnap, "struct{}"); }; if err := r.storage.SaveSnap(rd.Snapshot); err != nil { r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) } - // gofail: var raftAfterSaveSnap struct{} + if vraftAfterSaveSnap, __fpErr := __fp_raftAfterSaveSnap.Acquire(); __fpErr == nil { defer __fp_raftAfterSaveSnap.Release(); _, __fpTypeOK := vraftAfterSaveSnap.(struct{}); if !__fpTypeOK { goto __badTyperaftAfterSaveSnap} ; __badTyperaftAfterSaveSnap: __fp_raftAfterSaveSnap.BadType(vraftAfterSaveSnap, "struct{}"); }; } if !waitWALSync { - // gofail: var raftBeforeSave struct{} + if vraftBeforeSave, __fpErr := __fp_raftBeforeSave.Acquire(); __fpErr == nil { defer __fp_raftBeforeSave.Release(); _, __fpTypeOK := vraftBeforeSave.(struct{}); if !__fpTypeOK { goto __badTyperaftBeforeSave} ; __badTyperaftBeforeSave: __fp_raftBeforeSave.BadType(vraftBeforeSave, "struct{}"); }; if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } @@ -250,7 +250,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } - // gofail: var raftAfterSave struct{} + if vraftAfterSave, __fpErr := __fp_raftAfterSave.Acquire(); __fpErr == nil { defer __fp_raftAfterSave.Release(); _, __fpTypeOK := vraftAfterSave.(struct{}); if !__fpTypeOK { goto __badTyperaftAfterSave} ; __badTyperaftAfterSave: __fp_raftAfterSave.BadType(vraftAfterSave, "struct{}"); }; if !raft.IsEmptySnap(rd.Snapshot) { // Force WAL to fsync its hard state before Release() releases @@ -264,15 +264,15 @@ func (r *raftNode) start(rh *raftReadyHandler) { // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} - // gofail: var raftBeforeApplySnap struct{} + if vraftBeforeApplySnap, __fpErr := __fp_raftBeforeApplySnap.Acquire(); __fpErr == nil { defer __fp_raftBeforeApplySnap.Release(); _, __fpTypeOK := vraftBeforeApplySnap.(struct{}); if !__fpTypeOK { goto __badTyperaftBeforeApplySnap} ; __badTyperaftBeforeApplySnap: __fp_raftBeforeApplySnap.BadType(vraftBeforeApplySnap, "struct{}"); }; r.raftStorage.ApplySnapshot(rd.Snapshot) r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index)) - // gofail: var raftAfterApplySnap struct{} + if vraftAfterApplySnap, __fpErr := __fp_raftAfterApplySnap.Acquire(); __fpErr == nil { defer __fp_raftAfterApplySnap.Release(); _, __fpTypeOK := vraftAfterApplySnap.(struct{}); if !__fpTypeOK { goto __badTyperaftAfterApplySnap} ; __badTyperaftAfterApplySnap: __fp_raftAfterApplySnap.BadType(vraftAfterApplySnap, "struct{}"); }; if err := r.storage.Release(rd.Snapshot); err != nil { r.lg.Fatal("failed to release Raft wal", zap.Error(err)) } - // gofail: var raftAfterWALRelease struct{} + if vraftAfterWALRelease, __fpErr := __fp_raftAfterWALRelease.Acquire(); __fpErr == nil { defer __fp_raftAfterWALRelease.Release(); _, __fpTypeOK := vraftAfterWALRelease.(struct{}); if !__fpTypeOK { goto __badTyperaftAfterWALRelease} ; __badTyperaftAfterWALRelease: __fp_raftAfterWALRelease.BadType(vraftAfterWALRelease, "struct{}"); }; } r.raftStorage.Append(rd.Entries) @@ -309,7 +309,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { } } - // gofail: var raftBeforeFollowerSend struct{} + if vraftBeforeFollowerSend, __fpErr := __fp_raftBeforeFollowerSend.Acquire(); __fpErr == nil { defer __fp_raftBeforeFollowerSend.Release(); _, __fpTypeOK := vraftBeforeFollowerSend.(struct{}); if !__fpTypeOK { goto __badTyperaftBeforeFollowerSend} ; __badTyperaftBeforeFollowerSend: __fp_raftBeforeFollowerSend.BadType(vraftBeforeFollowerSend, "struct{}"); }; r.transport.Send(msgs) } else { // leader already processed 'MsgSnap' and signaled diff --git a/server/go.mod b/server/go.mod index 23cebcfeec99..e5bc9d0284da 100644 --- a/server/go.mod +++ b/server/go.mod @@ -29,6 +29,7 @@ require ( go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 + go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 go.opentelemetry.io/otel v1.7.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 diff --git a/server/go.sum b/server/go.sum index 889ef8b865e6..7f566f8ea86d 100644 --- a/server/go.sum +++ b/server/go.sum @@ -286,6 +286,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/server/storage/backend/backend.fail.go b/server/storage/backend/backend.fail.go new file mode 100644 index 000000000000..5e5680fc5e47 --- /dev/null +++ b/server/storage/backend/backend.fail.go @@ -0,0 +1,8 @@ +// GENERATED BY GOFAIL. DO NOT EDIT. + +package backend + +import "go.etcd.io/gofail/runtime" + +var __fp_defragBeforeCopy *runtime.Failpoint = runtime.NewFailpoint("backend", "defragBeforeCopy", false) +var __fp_defragBeforeRename *runtime.Failpoint = runtime.NewFailpoint("backend", "defragBeforeRename", false) diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index f30d79062c8c..98dee567565e 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -501,7 +501,7 @@ func (b *backend) defrag() error { zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), ) } - // gofail: var defragBeforeCopy struct{} + if vdefragBeforeCopy, __fpErr := __fp_defragBeforeCopy.Acquire(); __fpErr == nil { defer __fp_defragBeforeCopy.Release(); _, __fpTypeOK := vdefragBeforeCopy.(struct{}); if !__fpTypeOK { goto __badTypedefragBeforeCopy} ; __badTypedefragBeforeCopy: __fp_defragBeforeCopy.BadType(vdefragBeforeCopy, "struct{}"); }; err = defragdb(b.db, tmpdb, defragLimit) if err != nil { tmpdb.Close() @@ -519,7 +519,7 @@ func (b *backend) defrag() error { if err != nil { b.lg.Fatal("failed to close tmp database", zap.Error(err)) } - // gofail: var defragBeforeRename struct{} + if vdefragBeforeRename, __fpErr := __fp_defragBeforeRename.Acquire(); __fpErr == nil { defer __fp_defragBeforeRename.Release(); _, __fpTypeOK := vdefragBeforeRename.(struct{}); if !__fpTypeOK { goto __badTypedefragBeforeRename} ; __badTypedefragBeforeRename: __fp_defragBeforeRename.BadType(vdefragBeforeRename, "struct{}"); }; err = os.Rename(tdbp, dbp) if err != nil { b.lg.Fatal("failed to rename tmp database", zap.Error(err)) diff --git a/server/storage/backend/batch_tx.fail.go b/server/storage/backend/batch_tx.fail.go new file mode 100644 index 000000000000..a9f5f9ae99e3 --- /dev/null +++ b/server/storage/backend/batch_tx.fail.go @@ -0,0 +1,8 @@ +// GENERATED BY GOFAIL. DO NOT EDIT. + +package backend + +import "go.etcd.io/gofail/runtime" + +var __fp_beforeCommit *runtime.Failpoint = runtime.NewFailpoint("backend", "beforeCommit", false) +var __fp_afterCommit *runtime.Failpoint = runtime.NewFailpoint("backend", "afterCommit", false) diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index c8fa55954f64..ed70281be272 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -267,9 +267,9 @@ func (t *batchTx) commit(stop bool) { start := time.Now() - // gofail: var beforeCommit struct{} + if vbeforeCommit, __fpErr := __fp_beforeCommit.Acquire(); __fpErr == nil { defer __fp_beforeCommit.Release(); _, __fpTypeOK := vbeforeCommit.(struct{}); if !__fpTypeOK { goto __badTypebeforeCommit} ; __badTypebeforeCommit: __fp_beforeCommit.BadType(vbeforeCommit, "struct{}"); }; err := t.tx.Commit() - // gofail: var afterCommit struct{} + if vafterCommit, __fpErr := __fp_afterCommit.Acquire(); __fpErr == nil { defer __fp_afterCommit.Release(); _, __fpTypeOK := vafterCommit.(struct{}); if !__fpTypeOK { goto __badTypeafterCommit} ; __badTypeafterCommit: __fp_afterCommit.BadType(vafterCommit, "struct{}"); }; rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds()) spillSec.Observe(t.tx.Stats().SpillTime.Seconds()) diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 0af2cc1e8c3c..a3c499372194 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -33,4 +33,5 @@ type ClusterConfig struct { QuotaBackendBytes int64 DisableStrictReconfigCheck bool SnapshotCount int + Restart bool } diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index da694091612e..855a94e8d352 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -48,6 +48,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus QuotaBackendBytes: cfg.QuotaBackendBytes, DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck, SnapshotCount: cfg.SnapshotCount, + Restart: cfg.Restart, } switch cfg.ClientTLS { case config.NoTLS: @@ -89,6 +90,10 @@ func (c *e2eCluster) Client() Client { return e2eClient{e2e.NewEtcdctl(c.Cfg, c.EndpointsV3())} } +func (c *e2eCluster) Endpoints() []string { + return c.EndpointsV3() +} + func (c *e2eCluster) Members() (ms []Member) { for _, proc := range c.EtcdProcessCluster.Procs { ms = append(ms, e2eMember{EtcdProcess: proc, Cfg: c.Cfg}) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 250cc56ecf90..f7112efabdc5 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -178,6 +178,8 @@ type EtcdProcessClusterConfig struct { CorruptCheckTime time.Duration CompactHashCheckEnabled bool CompactHashCheckTime time.Duration + Restart bool + GoFailEnabled bool } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -357,12 +359,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* if cfg.CompactHashCheckTime != 0 { args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) } + envVars := map[string]string{} + if cfg.GoFailEnabled { + port = (i+1)*10000 + 2381 + envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", port) + } etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath, Args: args, - EnvVars: cfg.EnvVars, TlsArgs: cfg.TlsArgs(), DataDirPath: dataDirPath, KeepDataDir: cfg.KeepDataDir, @@ -371,6 +377,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* Acurl: curl, Murl: murl, InitialToken: cfg.InitialToken, + Restart: cfg.Restart, + EnvVars: envVars, } } diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 36042f287a13..1716198c632b 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -121,7 +121,7 @@ func (pp *proxyProc) start() error { if pp.proc != nil { panic("already started") } - proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name) + proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name, false) if err != nil { return err } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index f4c85990714b..949f05840e3b 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -22,10 +22,9 @@ import ( "testing" "time" - "go.uber.org/zap" - "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/expect" + "go.uber.org/zap" ) var ( @@ -80,6 +79,7 @@ type EtcdServerProcessConfig struct { InitialToken string InitialCluster string + Restart bool } func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { @@ -104,7 +104,7 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { panic("already started") } ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) - proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) + proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name, ep.cfg.Restart) if err != nil { return err } diff --git a/tests/framework/e2e/etcd_spawn.go b/tests/framework/e2e/etcd_spawn.go index ab86df150a21..adc2c0183b45 100644 --- a/tests/framework/e2e/etcd_spawn.go +++ b/tests/framework/e2e/etcd_spawn.go @@ -27,5 +27,5 @@ func SpawnCmd(args []string, envVars map[string]string) (*expect.ExpectProcess, } func SpawnNamedCmd(processName string, args []string, envVars map[string]string) (*expect.ExpectProcess, error) { - return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName) + return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName, false) } diff --git a/tests/framework/e2e/etcd_spawn_cov.go b/tests/framework/e2e/etcd_spawn_cov.go index 84d680385ef1..7599d1937278 100644 --- a/tests/framework/e2e/etcd_spawn_cov.go +++ b/tests/framework/e2e/etcd_spawn_cov.go @@ -68,7 +68,7 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string zap.String("working-dir", wd), zap.String("name", name), zap.Strings("environment-variables", env)) - return expect.NewExpectWithEnv(cmd, allArgs, env, name) + return expect.NewExpectWithEnv(cmd, allArgs, env, name, false) } func getCovArgs() ([]string, error) { diff --git a/tests/framework/e2e/etcd_spawn_nocov.go b/tests/framework/e2e/etcd_spawn_nocov.go index d050e8b52218..ee2089679ac0 100644 --- a/tests/framework/e2e/etcd_spawn_nocov.go +++ b/tests/framework/e2e/etcd_spawn_nocov.go @@ -28,7 +28,7 @@ import ( const noOutputLineCount = 0 // regular binaries emit no extra lines -func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string) (*expect.ExpectProcess, error) { +func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string, restart bool) (*expect.ExpectProcess, error) { wd, err := os.Getwd() if err != nil { return nil, err @@ -40,13 +40,17 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string zap.Strings("args", args), zap.String("working-dir", wd), zap.String("name", name), - zap.Strings("environment-variables", env)) - return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name) + zap.Strings("environment-variables", env), + zap.Bool("restart", restart), + ) + return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name, restart) } lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.String("name", name), - zap.Strings("environment-variables", env)) - return expect.NewExpectWithEnv(args[0], args[1:], env, name) + zap.Strings("environment-variables", env), + zap.Bool("restart", restart), + ) + return expect.NewExpectWithEnv(args[0], args[1:], env, name, restart) } diff --git a/tests/framework/interface.go b/tests/framework/interface.go index f3ff33de00fe..d560fb1f0ad8 100644 --- a/tests/framework/interface.go +++ b/tests/framework/interface.go @@ -33,6 +33,7 @@ type Cluster interface { Client() Client WaitLeader(t testing.TB) int Close() error + Endpoints() []string } type Member interface { diff --git a/tests/go.mod b/tests/go.mod index d806421d986d..a36b163f2586 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -15,6 +15,7 @@ replace ( ) require ( + github.com/anishathalye/porcupine v0.1.2 github.com/coreos/go-semver v0.3.0 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 @@ -80,6 +81,7 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.6 // indirect + go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect diff --git a/tests/go.sum b/tests/go.sum index 7c4432c9c848..4a60147d36cf 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -43,6 +43,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2 h1:eqWNeLcnTzXt6usipDJ4RFn6XOWqY5wEqBYVG3yFLSE= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -311,6 +313,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/tests/linearizability/issue14370_test.go b/tests/linearizability/issue14370_test.go new file mode 100644 index 000000000000..0ffc979998a8 --- /dev/null +++ b/tests/linearizability/issue14370_test.go @@ -0,0 +1,221 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "bytes" + "context" + "fmt" + "net/http" + "sync" + "testing" + "time" + + "github.com/anishathalye/porcupine" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/framework/testutils" + "go.uber.org/zap" +) + +const maxOperationsPerClient = 1000000 + +var httpClient = http.Client{ + Timeout: 10 * time.Millisecond, +} +var waitBetweenTriggers = time.Second + +func TestIssue14370(t *testing.T) { + testRunner.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) + defer cancel() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{ + InitialToken: "new", + ClusterSize: 1, + Restart: true, + GoFailEnabled: true, + }) + if err != nil { + t.Fatal(err) + } + defer clus.Close() + + // Validate that gofail is setup correctly + err = triggerFailpoint("etcdserver/raftBeforeSave", "sleep(1)") + if err != nil { + t.Errorf("Failed triggering a failpoint, err: %v", err) + t.Log("Looks like failpoints are not setup corrctly. Please make sure that etcd binaries where compiled with FAILPOINTS=true.") + return + } + + // Increasing number of failpoints number of tries making test more accurate but requiring more time. + failpointsCount := 60 + minimalQPS := 100.0 + allFailpointsInjected := make(chan struct{}) + go trigger14370Failpoints(ctx, t, failpointsCount, allFailpointsInjected) + start := time.Now() + operations := simulateTraffic(ctx, t, clus, 1, 8, allFailpointsInjected) + end := time.Now() + + t.Logf("Recorded %d operations", len(operations)) + qps := float64(len(operations)) / float64(end.Sub(start)) * float64(time.Second) + t.Logf("Average traffic: %f qps", qps) + if qps < minimalQPS { + t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", minimalQPS, qps) + } + linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0) + if linearizable != porcupine.Ok { + t.Error("Model is not linearizable, saving visualization to /tmp/results.html") + err := porcupine.VisualizePath(etcdModel, info, "/tmp/results.html") + if err != nil { + t.Errorf("Failed to visualize, err: %v", err) + } + } +} + +func trigger14370Failpoints(ctx context.Context, t *testing.T, failpointsCount int, finished chan<- struct{}) { + triggers := 0 + + time.Sleep(waitBetweenTriggers) + ctx, cancel := context.WithTimeout(ctx, 2*time.Duration(failpointsCount)*waitBetweenTriggers) + defer cancel() + testutils.ExecuteUntil(ctx, t, func() { + var err error + for { + // TODO: Also trigger "go.etcd.io/etcd/server/storage/backend/beforeCommit=sleep" and "go.etcd.io/etcd/server/etcdserver/raftBeforeLeaderSend" in same request + err = triggerFailpoint("etcdserver/raftBeforeSave", "panic") + if err != nil { + t.Log(err) + continue + } + triggers++ + if triggers >= failpointsCount { + break + } + time.Sleep(waitBetweenTriggers) + } + }) + time.Sleep(waitBetweenTriggers) + close(finished) +} + +func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, memberCount int, clientCount int, finished <-chan struct{}) (operations []porcupine.Operation) { + startTime := time.Now() + mux := sync.Mutex{} + endpoints := clus.EndpointsV3() + if len(endpoints) != memberCount { + t.Fatalf("Unexpected number of endpoints, got %d, expected %d", len(endpoints), memberCount) + } + + testutils.ExecuteUntil(ctx, t, func() { + wg := sync.WaitGroup{} + for i := 0; i < clientCount; i++ { + cc, err := clientv3.New(clientv3.Config{ + Endpoints: []string{endpoints[i%memberCount]}, + Logger: zap.NewNop(), + DialTimeout: 5 * time.Millisecond, + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + defer cc.Close() + wg.Add(1) + go func(clientId int) { + defer wg.Done() + op, err := putGet(cc, clientId, startTime, finished) + if err != nil { + t.Error(err) + return + } + mux.Lock() + operations = append(operations, op...) + mux.Unlock() + }(i) + } + + wg.Wait() + }) + return operations +} + +func triggerFailpoint(failpoint, payload string) error { + // TODO: Send failpoints to different members + r, err := http.NewRequest("PUT", "http://127.0.0.1:12381/"+failpoint, bytes.NewBuffer([]byte(payload))) + if err != nil { + return fmt.Errorf("failed to trigger failpoint: %q, err: %v", failpoint, err) + } + resp, err := httpClient.Do(r) + if err != nil { + return fmt.Errorf("failed to trigger failpoint: %q, err: %v", failpoint, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad response for failpoint: %q, status: %d", failpoint, resp.StatusCode) + } + return nil +} + +func putGet(cc *clientv3.Client, clientId int, startTime time.Time, finished <-chan struct{}) (operations []porcupine.Operation, err error) { + id := maxOperationsPerClient * clientId + ctx := context.Background() + key := "key" + + for i := 0; i < maxOperationsPerClient; { + select { + case <-finished: + return operations, nil + default: + } + getStartTime := time.Now() + getCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + resp, err := cc.Get(getCtx, key) + cancel() + getResponseTime := time.Now() + if err != nil { + continue + } + var readData string + if len(resp.Kvs) == 1 { + readData = string(resp.Kvs[0].Value) + } + operations = append(operations, porcupine.Operation{ + ClientId: clientId, + Input: etcdRequest{op: Read}, + Call: getStartTime.Sub(startTime).Nanoseconds(), + Output: etcdResponse{readData: readData}, + Return: getResponseTime.Sub(startTime).Nanoseconds(), + }) + putData := fmt.Sprintf("%d", id+i) + putStartTime := time.Now() + putCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + _, err = cc.Put(putCtx, key, putData) + cancel() + putResponseTime := time.Now() + operations = append(operations, porcupine.Operation{ + ClientId: clientId, + Input: etcdRequest{op: Put, writeData: putData}, + Call: putStartTime.Sub(startTime).Nanoseconds(), + Output: etcdResponse{err: err}, + Return: putResponseTime.Sub(startTime).Nanoseconds(), + }) + if err != nil { + continue + } + i++ + } + return operations, nil +} diff --git a/tests/linearizability/main_test.go b/tests/linearizability/main_test.go new file mode 100644 index 000000000000..61f84b5322cc --- /dev/null +++ b/tests/linearizability/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "testing" + + "go.etcd.io/etcd/tests/v3/framework" +) + +var testRunner = framework.E2eTestRunner + +func TestMain(m *testing.M) { + testRunner.TestMain(m) +} diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go new file mode 100644 index 000000000000..4d817d5527b6 --- /dev/null +++ b/tests/linearizability/model.go @@ -0,0 +1,112 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "encoding/json" + "fmt" + + "github.com/anishathalye/porcupine" +) + +type Operation int8 + +const Read Operation = 0 +const Put Operation = 1 + +type etcdRequest struct { + op Operation + writeData string +} + +type etcdResponse struct { + readData string + err error +} + +type EtcdState struct { + Value string + FailedWrites []string +} + +var etcdModel = porcupine.Model{ + Init: func() interface{} { return "{}" }, + Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { + stateString := st.(string) + var state EtcdState + err := json.Unmarshal([]byte(stateString), &state) + if err != nil { + panic(err) + } + request := in.(etcdRequest) + response := out.(etcdResponse) + ok, state := step(state, request, response) + data, err := json.Marshal(state) + if err != nil { + panic(err) + } + return ok, string(data) + }, + DescribeOperation: func(in, out interface{}) string { + request := in.(etcdRequest) + response := out.(etcdResponse) + var call, args, resp string + switch request.op { + case Read: + call = "read" + if response.err != nil { + resp = response.err.Error() + } else { + resp = response.readData + } + case Put: + call = "write" + args = request.writeData + if response.err != nil { + resp = response.err.Error() + } else { + resp = "ok" + } + default: + return "" + } + return fmt.Sprintf("%s(%q) -> %s", call, args, resp) + }, +} + +func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { + var ok bool + switch request.op { + case Read: + ok = state.Value == response.readData + if !ok { + for i, write := range state.FailedWrites { + if write == response.readData { + ok = true + state = EtcdState{Value: write, FailedWrites: append(state.FailedWrites[:i], state.FailedWrites[i+1:]...)} + break + } + } + } + case Put: + if response.err == nil { + state.Value = request.writeData + } else { + state.FailedWrites = append(state.FailedWrites, request.writeData) + } + ok = true + } + return ok, state +}