From fa1a0e5a1a605e7a6f4302e8fc7264767e113246 Mon Sep 17 00:00:00 2001 From: Vincent Lee Date: Mon, 8 Jan 2018 23:43:04 +0800 Subject: [PATCH] raft: raft learners should be returned after applyConfChange --- CHANGELOG.md | 36 ++++++-- clientv3/client.go | 23 ++--- clientv3/integration/server_shutdown_test.go | 5 +- clientv3/leasing/kv.go | 2 +- clientv3/retry.go | 8 +- .../grpc-gateway/runtime/convert.go | 18 ++++ .../grpc-gateway/runtime/handler.go | 34 +++++-- .../grpc-gateway/runtime/marshal_json.go | 5 + .../grpc-gateway/runtime/marshal_jsonpb.go | 5 + .../grpc-gateway/runtime/marshal_proto.go | 62 +++++++++++++ .../grpc-gateway/runtime/marshaler.go | 6 ++ etcdctl/README.md | 2 + etcdctl/ctlv3/command/snapshot_command.go | 7 +- etcdserver/api/v3rpc/lease.go | 12 ++- etcdserver/api/v3rpc/rpctypes/error_test.go | 6 +- etcdserver/api/v3rpc/util.go | 13 +++ etcdserver/api/v3rpc/watch.go | 24 ++++- glide.lock | 6 +- glide.yaml | 2 +- mvcc/backend/backend.go | 2 +- raft/log_unstable.go | 2 +- raft/node.go | 10 +- raft/node_test.go | 58 +++++++++++- raft/raft.go | 44 +++++---- raft/raft_test.go | 91 +++++++------------ raft/rawnode.go | 6 +- scripts/genproto.sh | 2 +- test | 3 + .../command/lease_renewer_command.go | 3 +- .../etcd-tester/key_stresser.go | 90 +++++++++++++++++- tools/functional-tester/etcd-tester/main.go | 22 +++-- .../functional-tester/etcd-tester/stresser.go | 22 +++-- 32 files changed, 477 insertions(+), 154 deletions(-) create mode 100644 cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_proto.go diff --git a/CHANGELOG.md b/CHANGELOG.md index edd5b5085b01..7a0d6924d34d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +## v3.4.0 (TBD) + +**v3.4.0 is not yet released.** + +### Added(`etcd`) + +- Add [`watch_id` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9065), allow user-provided watch ID to `mvcc`. + - Corresponding `watch_id` is returned via `etcdserverpb.WatchResponse`, if any. + +### Improved(`etcd/raft`) + +- [Improve `becomeLeader` and `stepLeader`](https://github.com/coreos/etcd/pull/9073) by keeping track of latest `pb.EntryConfChange` index. + - Previously record `pendingConf` boolean field scanning the entire tail of the log, which can delay hearbeat send. + + ## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0) (2018-01-??) **v3.3.0 is not yet released; expected to be released in January 2018.** @@ -25,7 +40,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0-rc.0) - Require [Go 1.9+](https://github.com/coreos/etcd/issues/6174). - Compile with *Go 1.9.2*. - Deprecate [`golang.org/x/net/context`](https://github.com/coreos/etcd/pull/8511). -- Require [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) [**`v1.7.4`**](https://github.com/grpc/grpc-go/releases/tag/v1.7.4) or [**`v1.7.5+`**](https://github.com/grpc/grpc-go/releases/tag/v1.7.5): +- Require [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) [**`v1.7.4`**](https://github.com/grpc/grpc-go/releases/tag/v1.7.4) or [**`v1.7.5+`**](https://github.com/grpc/grpc-go/releases/tag/v1.7.5). - Deprecate [`metadata.Incoming/OutgoingContext`](https://github.com/coreos/etcd/pull/7896). - Deprecate `grpclog.Logger`, upgrade to [`grpclog.LoggerV2`](https://github.com/coreos/etcd/pull/8533). - Deprecate [`grpc.ErrClientConnTimeout`](https://github.com/coreos/etcd/pull/8505) errors in `clientv3`. @@ -106,6 +121,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0-rc.0) - Add [`lease keep-alive --once`](https://github.com/coreos/etcd/pull/8775) flag. - Make [`lease timetolive LEASE_ID`](https://github.com/coreos/etcd/issues/9028) on expired lease print [`lease LEASE_ID already expired`](https://github.com/coreos/etcd/pull/9047). - <=3.2 prints `lease LEASE_ID granted with TTL(0s), remaining(-1s)`. +- Add [`snapshot restore --wal-dir`](https://github.com/coreos/etcd/pull/9124) flag. - Add [`defrag --data-dir`](https://github.com/coreos/etcd/pull/8367) flag. - Add [`move-leader`](https://github.com/coreos/etcd/pull/8153) command. - Add [`endpoint hashkv`](https://github.com/coreos/etcd/pull/8351) command. @@ -131,7 +147,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0-rc.0) ### Added(`grpc-proxy`) -- Add [`grpc-proxy start --experimental-leasing-prefix`](https://github.com/coreos/etcd/pull/8341) flag: +- Add [`grpc-proxy start --experimental-leasing-prefix`](https://github.com/coreos/etcd/pull/8341) flag. - For disconnected linearized reads. - Based on [V system leasing](https://github.com/coreos/etcd/issues/6065). - See ["Disconnected consistent reads with etcd" blog post](https://coreos.com/blog/coreos-labs-disconnected-consistent-reads-with-etcd). @@ -160,7 +176,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0-rc.0) ### Added/Fixed(Security/Auth) - Add [CRL based connection rejection](https://github.com/coreos/etcd/pull/8124) to manage [revoked certs](https://github.com/coreos/etcd/issues/4034). -- Document [TLS authentication changes](https://github.com/coreos/etcd/pull/8895): +- Document [TLS authentication changes](https://github.com/coreos/etcd/pull/8895). - [Server accepts connections if IP matches, without checking DNS entries](https://github.com/coreos/etcd/pull/8223). For instance, if peer cert contains IP addresses and DNS names in Subject Alternative Name (SAN) field, and the remote IP address matches one of those IP addresses, server just accepts connection without further checking the DNS names. - [Server supports reverse-lookup on wildcard DNS `SAN`](https://github.com/coreos/etcd/pull/8281). For instance, if peer cert contains only DNS names (no IP addresses) in Subject Alternative Name (SAN) field, server first reverse-lookups the remote IP address to get a list of names mapping to that address (e.g. `nslookup IPADDR`). Then accepts the connection if those names have a matching name with peer cert's DNS names (either by exact or wildcard match). If none is matched, server forward-lookups each DNS entry in peer cert (e.g. look up `example.default.svc` when the entry is `*.example.default.svc`), and accepts connection only when the host's resolved addresses have the matching IP address with the peer's remote IP address. - Add [`etcd --peer-require-cn`](https://github.com/coreos/etcd/pull/8616) flag. @@ -179,7 +195,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0-rc.0) ### Fixed(v3) -- Fix [range/put/delete operation metrics](https://github.com/coreos/etcd/pull/8054) with transaction: +- Fix [range/put/delete operation metrics](https://github.com/coreos/etcd/pull/8054) with transaction. - `etcd_debugging_mvcc_range_total` - `etcd_debugging_mvcc_put_total` - `etcd_debugging_mvcc_delete_total` @@ -210,8 +226,8 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0-rc.0) ### Other - Support previous two minor versions (see our [new release policy](https://github.com/coreos/etcd/pull/8805)). -- `v3.3.x` is the last release cycle that supports `ACI`: - - AppC was [officially suspended](https://github.com/appc/spec#-disclaimer-), as of late 2016. +- `v3.3.x` is the last release cycle that supports `ACI`. + - [AppC was officially suspended](https://github.com/appc/spec#-disclaimer-), as of late 2016. - [`acbuild`](https://github.com/containers/build#this-project-is-currently-unmaintained) is not maintained anymore. - `*.aci` files won't be available from etcd `v3.4` release. - Add container registry [`gcr.io/etcd-development/etcd`](https://gcr.io/etcd-development/etcd). @@ -257,7 +273,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.10...v3.2.11) and ### Fixed -- Fix racey grpc-go's server handler transport `WriteStatus` call to prevent [TLS-enabled etcd server crash](https://github.com/coreos/etcd/issues/8904): +- Fix racey grpc-go's server handler transport `WriteStatus` call to prevent [TLS-enabled etcd server crash](https://github.com/coreos/etcd/issues/8904). - Upgrade [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) `v1.7.3` to `v1.7.4`. - Add [gRPC RPC failure warnings](https://github.com/coreos/etcd/pull/8939) to help debug such issues in the future. - Remove `--listen-metrics-urls` flag in monitoring document (non-released in `v3.2.x`, planned for `v3.3.x`). @@ -284,7 +300,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.9...v3.2.10) and ### Fixed - Replace backend key-value database `boltdb/bolt` with [`coreos/bbolt`](https://github.com/coreos/bbolt/releases) to address [backend database size issue](https://github.com/coreos/etcd/issues/8009). -- Fix `clientv3` balancer to handle [network partitions](https://github.com/coreos/etcd/issues/8711): +- Fix `clientv3` balancer to handle [network partitions](https://github.com/coreos/etcd/issues/8711). - Upgrade [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) `v1.2.1` to `v1.7.3`. - Upgrade [`github.com/grpc-ecosystem/grpc-gateway`](https://github.com/grpc-ecosystem/grpc-gateway/releases) `v1.2` to `v1.3`. - Revert [discovery SRV auth `ServerName` with `*.{ROOT_DOMAIN}`](https://github.com/coreos/etcd/pull/8651) to support non-wildcard subject alternative names in the certs (see [issue #8445](https://github.com/coreos/etcd/issues/8445) for more contexts). @@ -404,7 +420,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.2.1...v3.2.2) and [ ### Fixed -- Use user-provided listen address to connect to gRPC gateway: +- Use user-provided listen address to connect to gRPC gateway. - `net.Listener` rewrites IPv4 0.0.0.0 to IPv6 [::], breaking IPv6 disabled hosts. - Only v3.2.0, v3.2.1 are affected. - Accept connection with matched IP SAN but no DNS match. @@ -618,7 +634,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.0.0...v3.1.0) and [ ### Changed -- Deprecated following gRPC metrics in favor of [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus): +- Deprecated following gRPC metrics in favor of [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus). - `etcd_grpc_requests_total` - `etcd_grpc_requests_failed_total` - `etcd_grpc_active_streams` diff --git a/clientv3/client.go b/clientv3/client.go index a2c313d35b53..685401084da6 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -537,18 +537,19 @@ func toErr(ctx context.Context, err error) error { if _, ok := err.(rpctypes.EtcdError); ok { return err } - ev, _ := status.FromError(err) - code := ev.Code() - switch code { - case codes.DeadlineExceeded: - fallthrough - case codes.Canceled: - if ctx.Err() != nil { - err = ctx.Err() + if ev, ok := status.FromError(err); ok { + code := ev.Code() + switch code { + case codes.DeadlineExceeded: + fallthrough + case codes.Canceled: + if ctx.Err() != nil { + err = ctx.Err() + } + case codes.Unavailable: + case codes.FailedPrecondition: + err = grpc.ErrClientConnClosing } - case codes.Unavailable: - case codes.FailedPrecondition: - err = grpc.ErrClientConnClosing } return err } diff --git a/clientv3/integration/server_shutdown_test.go b/clientv3/integration/server_shutdown_test.go index 8a89acc58768..dcda528dbe45 100644 --- a/clientv3/integration/server_shutdown_test.go +++ b/clientv3/integration/server_shutdown_test.go @@ -361,7 +361,10 @@ func isServerCtxTimeout(err error) bool { if err == nil { return false } - ev, _ := status.FromError(err) + ev, ok := status.FromError(err) + if !ok { + return false + } code := ev.Code() return code == codes.DeadlineExceeded && strings.Contains(err.Error(), "context deadline exceeded") } diff --git a/clientv3/leasing/kv.go b/clientv3/leasing/kv.go index 5a5e2312b726..051a8fcebbb4 100644 --- a/clientv3/leasing/kv.go +++ b/clientv3/leasing/kv.go @@ -285,7 +285,7 @@ func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.Tx if _, ok := err.(rpctypes.EtcdError); ok { return nil, err } - if ev, _ := status.FromError(err); ev.Code() != codes.Unavailable { + if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable { return nil, err } } diff --git a/clientv3/retry.go b/clientv3/retry.go index 7f89ba641a44..f923f74ba26a 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -52,7 +52,10 @@ func isRepeatableStopError(err error) bool { return true } // only retry if unavailable - ev, _ := status.FromError(err) + ev, ok := status.FromError(err) + if !ok { + return false + } return ev.Code() != codes.Unavailable } @@ -68,8 +71,7 @@ func isRepeatableStopError(err error) bool { // Returning "true" means retry should stop, otherwise it violates // write-at-most-once semantics. func isNonRepeatableStopError(err error) bool { - ev, _ := status.FromError(err) - if ev.Code() != codes.Unavailable { + if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable { return true } desc := rpctypes.ErrorDesc(err) diff --git a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/convert.go b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/convert.go index 1af5cc4ebdd4..f89318641004 100644 --- a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/convert.go +++ b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/convert.go @@ -2,6 +2,10 @@ package runtime import ( "strconv" + + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/ptypes/duration" + "github.com/golang/protobuf/ptypes/timestamp" ) // String just returns the given string. @@ -56,3 +60,17 @@ func Uint32(val string) (uint32, error) { } return uint32(i), nil } + +// Timestamp converts the given RFC3339 formatted string into a timestamp.Timestamp. +func Timestamp(val string) (*timestamp.Timestamp, error) { + var r *timestamp.Timestamp + err := jsonpb.UnmarshalString(val, r) + return r, err +} + +// Duration converts the given string into a timestamp.Duration. +func Duration(val string) (*duration.Duration, error) { + var r *duration.Duration + err := jsonpb.UnmarshalString(val, r) + return r, err +} diff --git a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/handler.go b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/handler.go index ae6a5d551cfb..1770b85344d2 100644 --- a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/handler.go +++ b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/handler.go @@ -34,34 +34,47 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal w.Header().Set("Transfer-Encoding", "chunked") w.Header().Set("Content-Type", marshaler.ContentType()) if err := handleForwardResponseOptions(ctx, w, nil, opts); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + HTTPError(ctx, mux, marshaler, w, req, err) return } - w.WriteHeader(http.StatusOK) - f.Flush() + + var delimiter []byte + if d, ok := marshaler.(Delimited); ok { + delimiter = d.Delimiter() + } else { + delimiter = []byte("\n") + } + + var wroteHeader bool for { resp, err := recv() if err == io.EOF { return } if err != nil { - handleForwardResponseStreamError(marshaler, w, err) + handleForwardResponseStreamError(wroteHeader, marshaler, w, err) return } if err := handleForwardResponseOptions(ctx, w, resp, opts); err != nil { - handleForwardResponseStreamError(marshaler, w, err) + handleForwardResponseStreamError(wroteHeader, marshaler, w, err) return } buf, err := marshaler.Marshal(streamChunk(resp, nil)) if err != nil { grpclog.Printf("Failed to marshal response chunk: %v", err) + handleForwardResponseStreamError(wroteHeader, marshaler, w, err) return } if _, err = w.Write(buf); err != nil { grpclog.Printf("Failed to send response chunk: %v", err) return } + wroteHeader = true + if _, err = w.Write(delimiter); err != nil { + grpclog.Printf("Failed to send delimiter chunk: %v", err) + return + } f.Flush() } } @@ -134,13 +147,20 @@ func handleForwardResponseOptions(ctx context.Context, w http.ResponseWriter, re return nil } -func handleForwardResponseStreamError(marshaler Marshaler, w http.ResponseWriter, err error) { +func handleForwardResponseStreamError(wroteHeader bool, marshaler Marshaler, w http.ResponseWriter, err error) { buf, merr := marshaler.Marshal(streamChunk(nil, err)) if merr != nil { grpclog.Printf("Failed to marshal an error: %v", merr) return } - if _, werr := fmt.Fprintf(w, "%s\n", buf); werr != nil { + if !wroteHeader { + s, ok := status.FromError(err) + if !ok { + s = status.New(codes.Unknown, err.Error()) + } + w.WriteHeader(HTTPStatusFromCode(s.Code())) + } + if _, werr := w.Write(buf); werr != nil { grpclog.Printf("Failed to notify error to client: %v", werr) return } diff --git a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_json.go b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_json.go index 0acd2ca29ef0..b3a21418be48 100644 --- a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_json.go +++ b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_json.go @@ -35,3 +35,8 @@ func (j *JSONBuiltin) NewDecoder(r io.Reader) Decoder { func (j *JSONBuiltin) NewEncoder(w io.Writer) Encoder { return json.NewEncoder(w) } + +// Delimiter for newline encoded JSON streams. +func (j *JSONBuiltin) Delimiter() []byte { + return []byte("\n") +} diff --git a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_jsonpb.go b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_jsonpb.go index 49f13f7fc74c..d42cc593e510 100644 --- a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_jsonpb.go +++ b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_jsonpb.go @@ -182,3 +182,8 @@ type protoEnum interface { } var typeProtoMessage = reflect.TypeOf((*proto.Message)(nil)).Elem() + +// Delimiter for newline encoded JSON streams. +func (j *JSONPb) Delimiter() []byte { + return []byte("\n") +} diff --git a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_proto.go b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_proto.go new file mode 100644 index 000000000000..f65d1a2676b8 --- /dev/null +++ b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshal_proto.go @@ -0,0 +1,62 @@ +package runtime + +import ( + "io" + + "errors" + "github.com/golang/protobuf/proto" + "io/ioutil" +) + +// ProtoMarshaller is a Marshaller which marshals/unmarshals into/from serialize proto bytes +type ProtoMarshaller struct{} + +// ContentType always returns "application/octet-stream". +func (*ProtoMarshaller) ContentType() string { + return "application/octet-stream" +} + +// Marshal marshals "value" into Proto +func (*ProtoMarshaller) Marshal(value interface{}) ([]byte, error) { + message, ok := value.(proto.Message) + if !ok { + return nil, errors.New("unable to marshal non proto field") + } + return proto.Marshal(message) +} + +// Unmarshal unmarshals proto "data" into "value" +func (*ProtoMarshaller) Unmarshal(data []byte, value interface{}) error { + message, ok := value.(proto.Message) + if !ok { + return errors.New("unable to unmarshal non proto field") + } + return proto.Unmarshal(data, message) +} + +// NewDecoder returns a Decoder which reads proto stream from "reader". +func (marshaller *ProtoMarshaller) NewDecoder(reader io.Reader) Decoder { + return DecoderFunc(func(value interface{}) error { + buffer, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + return marshaller.Unmarshal(buffer, value) + }) +} + +// NewEncoder returns an Encoder which writes proto stream into "writer". +func (marshaller *ProtoMarshaller) NewEncoder(writer io.Writer) Encoder { + return EncoderFunc(func(value interface{}) error { + buffer, err := marshaller.Marshal(value) + if err != nil { + return err + } + _, err = writer.Write(buffer) + if err != nil { + return err + } + + return nil + }) +} diff --git a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshaler.go b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshaler.go index 6d434f13cb46..98fe6e88ac59 100644 --- a/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshaler.go +++ b/cmd/vendor/github.com/grpc-ecosystem/grpc-gateway/runtime/marshaler.go @@ -40,3 +40,9 @@ type EncoderFunc func(v interface{}) error // Encode delegates invocations to the underlying function itself. func (f EncoderFunc) Encode(v interface{}) error { return f(v) } + +// Delimited defines the streaming delimiter. +type Delimited interface { + // Delimiter returns the record seperator for the stream. + Delimiter() []byte +} diff --git a/etcdctl/README.md b/etcdctl/README.md index 54e777ba717c..17a14ce5cfb1 100644 --- a/etcdctl/README.md +++ b/etcdctl/README.md @@ -874,6 +874,8 @@ The snapshot restore options closely resemble to those used in the `etcd` comman - data-dir -- Path to the data directory. Uses \.etcd if none given. +- wal-dir -- Path to the WAL directory. Uses data directory if none given. + - initial-cluster -- The initial cluster configuration for the restored etcd cluster. - initial-cluster-token -- Initial cluster token for the restored etcd cluster. diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 026339bc1f2d..2929cd94e8c7 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -56,6 +56,7 @@ var ( restoreCluster string restoreClusterToken string restoreDataDir string + restoreWalDir string restorePeerURLs string restoreName string skipHashCheck bool @@ -99,6 +100,7 @@ func NewSnapshotRestoreCommand() *cobra.Command { Run: snapshotRestoreCommandFunc, } cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory") + cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)") cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap") cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap") cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster") @@ -187,7 +189,10 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) { basedir = restoreName + ".etcd" } - waldir := filepath.Join(basedir, "member", "wal") + waldir := restoreWalDir + if waldir == "" { + waldir = filepath.Join(basedir, "member", "wal") + } snapdir := filepath.Join(basedir, "member", "snap") if _, err := os.Stat(basedir); err == nil { diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index 13602b4e652c..5b4f2b142281 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -107,7 +107,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro return nil } if err != nil { - plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) + if isClientCtxErr(stream.Context().Err(), err) { + plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) + } return err } @@ -133,7 +137,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro resp.TTL = ttl err = stream.Send(resp) if err != nil { - plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(stream.Context().Err(), err) { + plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) + } return err } } diff --git a/etcdserver/api/v3rpc/rpctypes/error_test.go b/etcdserver/api/v3rpc/rpctypes/error_test.go index e1b3968227ac..525d96983112 100644 --- a/etcdserver/api/v3rpc/rpctypes/error_test.go +++ b/etcdserver/api/v3rpc/rpctypes/error_test.go @@ -29,16 +29,14 @@ func TestConvert(t *testing.T) { if e1.Error() != e2.Error() { t.Fatalf("expected %q == %q", e1.Error(), e2.Error()) } - ev1, _ := status.FromError(e1) - if ev1.Code() != e3.(EtcdError).Code() { + if ev1, ok := status.FromError(e1); ok && ev1.Code() != e3.(EtcdError).Code() { t.Fatalf("expected them to be equal, got %v / %v", ev1.Code(), e3.(EtcdError).Code()) } if e1.Error() == e3.Error() { t.Fatalf("expected %q != %q", e1.Error(), e3.Error()) } - ev2, _ := status.FromError(e2) - if ev2.Code() != e3.(EtcdError).Code() { + if ev2, ok := status.FromError(e2); ok && ev2.Code() != e3.(EtcdError).Code() { t.Fatalf("expected them to be equal, got %v / %v", ev2.Code(), e3.(EtcdError).Code()) } } diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 71a549b0555f..e4bb85ae7552 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -81,3 +81,16 @@ func togRPCError(err error) error { } return grpcErr } + +func isClientCtxErr(ctxErr error, err error) bool { + if ctxErr != nil { + return true + } + + ev, ok := status.FromError(err) + if !ok { + return false + } + code := ev.Code() + return code == codes.Canceled || code == codes.DeadlineExceeded +} diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index e8b19565a398..0d5a4c2fd1fe 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -140,7 +140,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { // deadlock when calling sws.close(). go func() { if rerr := sws.recvLoop(); rerr != nil { - plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error()) + if isClientCtxErr(stream.Context().Err(), rerr) { + plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error()) + } else { + plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error()) + } errc <- rerr } }() @@ -342,7 +346,11 @@ func (sws *serverWatchStream) sendLoop() { mvcc.ReportEventReceived(len(evs)) if err := sws.gRPCStream.Send(wr); err != nil { - plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(sws.gRPCStream.Context().Err(), err) { + plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error()) + } return } @@ -359,7 +367,11 @@ func (sws *serverWatchStream) sendLoop() { } if err := sws.gRPCStream.Send(c); err != nil { - plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(sws.gRPCStream.Context().Err(), err) { + plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error()) + } return } @@ -375,7 +387,11 @@ func (sws *serverWatchStream) sendLoop() { for _, v := range pending[wid] { mvcc.ReportEventReceived(len(v.Events)) if err := sws.gRPCStream.Send(v); err != nil { - plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(sws.gRPCStream.Context().Err(), err) { + plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error()) + } return } } diff --git a/glide.lock b/glide.lock index 0188e0d68c18..bc499956fef1 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 23d3b011a2e95e7c285287f62d6c7404ee42ccf6f81055cd24ab58da51a10f42 -updated: 2017-12-18T15:54:23.990881-08:00 +hash: 717378e57448533f1e2b054fe152b3f51e5e397292527c82ab24fb2c6c7d2a8f +updated: 2018-01-09T12:39:45.249170188-08:00 imports: - name: github.com/beorn7/perks version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 @@ -61,7 +61,7 @@ imports: - name: github.com/grpc-ecosystem/go-grpc-prometheus version: 0dafe0d496ea71181bf2dd039e7e3f44b6bd11a7 - name: github.com/grpc-ecosystem/grpc-gateway - version: 8cc3a55af3bcf171a1c23a90c4df9cf591706104 + version: 07f5e79768022f9a3265235f0db4ac8c3f675fec subpackages: - runtime - runtime/internal diff --git a/glide.yaml b/glide.yaml index 20c49b3af272..46fa0e79e9d3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -45,7 +45,7 @@ import: - package: github.com/google/btree version: 925471ac9e2131377a91e1595defec898166fe49 - package: github.com/grpc-ecosystem/grpc-gateway - version: v1.3.0 + version: v1.3.1 subpackages: - runtime - runtime/internal diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 42009dd470c7..c0305cf3215e 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -373,10 +373,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { } tmpb, berr := tmptx.CreateBucketIfNotExists(next) - tmpb.FillPercent = 0.9 // for seq write in for each if berr != nil { return berr } + tmpb.FillPercent = 0.9 // for seq write in for each b.ForEach(func(k, v []byte) error { count++ diff --git a/raft/log_unstable.go b/raft/log_unstable.go index 263af9ce405e..a8a8f5ca132f 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -147,7 +147,7 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { return u.entries[lo-u.offset : hi-u.offset] } -// u.offset <= lo <= hi <= u.offset+len(u.offset) +// u.offset <= lo <= hi <= u.offset+len(u.entries) func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) { if lo > hi { u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi) diff --git a/raft/node.go b/raft/node.go index 33a9db840012..f3ba250b9af9 100644 --- a/raft/node.go +++ b/raft/node.go @@ -324,9 +324,10 @@ func (n *node) run(r *raft) { } case cc := <-n.confc: if cc.NodeID == None { - r.resetPendingConf() select { - case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: + case n.confstatec <- pb.ConfState{ + Nodes: r.nodes(), + Learners: r.learnerNodes()}: case <-n.done: } break @@ -344,12 +345,13 @@ func (n *node) run(r *raft) { } r.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: - r.resetPendingConf() default: panic("unexpected conf type") } select { - case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: + case n.confstatec <- pb.ConfState{ + Nodes: r.nodes(), + Learners: r.learnerNodes()}: case <-n.done: } case <-n.tickc: diff --git a/raft/node_test.go b/raft/node_test.go index 4401412e7743..0ccceb8a2fd9 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -348,6 +348,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { n.Tick() case rd := <-n.Ready(): s.Append(rd.Entries) + applied := false for _, e := range rd.Entries { rdyEntries = append(rdyEntries, e) switch e.Type { @@ -356,10 +357,13 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { var cc raftpb.ConfChange cc.Unmarshal(e.Data) n.ApplyConfChange(cc) - applyConfChan <- struct{}{} + applied = true } } n.Advance() + if applied { + applyConfChan <- struct{}{} + } } } }() @@ -728,3 +732,55 @@ func TestIsHardStateEqual(t *testing.T) { } } } + +func TestNodeProposeAddLearnerNode(t *testing.T) { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + n := newNode() + s := NewMemoryStorage() + r := newTestRaft(1, []uint64{1}, 10, 1, s) + go n.run(r) + n.Campaign(context.TODO()) + stop := make(chan struct{}) + done := make(chan struct{}) + applyConfChan := make(chan struct{}) + go func() { + defer close(done) + for { + select { + case <-stop: + return + case <-ticker.C: + n.Tick() + case rd := <-n.Ready(): + s.Append(rd.Entries) + t.Logf("raft: %v", rd.Entries) + for _, ent := range rd.Entries { + if ent.Type != raftpb.EntryConfChange { + continue + } + var cc raftpb.ConfChange + cc.Unmarshal(ent.Data) + state := n.ApplyConfChange(cc) + if len(state.Learners) == 0 || + state.Learners[0] != cc.NodeID || + cc.NodeID != 2 { + t.Errorf("apply conf change should return new added learner: %v", state.String()) + } + + if len(state.Nodes) != 1 { + t.Errorf("add learner should not change the nodes: %v", state.String()) + } + t.Logf("apply raft conf %v changed to: %v", cc, state.String()) + applyConfChan <- struct{}{} + } + n.Advance() + } + } + }() + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2} + n.ProposeConfChange(context.TODO(), cc) + <-applyConfChan + close(stop) + <-done +} diff --git a/raft/raft.go b/raft/raft.go index 1b191d9ff261..f1fafb1c23db 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -256,8 +256,13 @@ type raft struct { // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 - // New configuration is ignored if there exists unapplied configuration. - pendingConf bool + // Only one conf change may be pending (in the log, but not yet + // applied) at a time. This is enforced via pendingConfIndex, which + // is set to a value >= the log index of the latest pending + // configuration change (if any). Config changes are only allowed to + // be proposed if the leader's applied index is greater than this + // value. + pendingConfIndex uint64 readOnly *readOnly @@ -372,10 +377,16 @@ func (r *raft) hardState() pb.HardState { func (r *raft) quorum() int { return len(r.prs)/2 + 1 } func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs)) + nodes := make([]uint64, 0, len(r.prs)) for id := range r.prs { nodes = append(nodes, id) } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +func (r *raft) learnerNodes() []uint64 { + nodes := make([]uint64, 0, len(r.learnerPrs)) for id := range r.learnerPrs { nodes = append(nodes, id) } @@ -579,7 +590,7 @@ func (r *raft) reset(term uint64) { } }) - r.pendingConf = false + r.pendingConfIndex = 0 r.readOnly = newReadOnly(r.readOnly.option) } @@ -683,12 +694,13 @@ func (r *raft) becomeLeader() { r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) } - nconf := numOfPendingConf(ents) - if nconf > 1 { - panic("unexpected multiple uncommitted config entry") - } - if nconf == 1 { - r.pendingConf = true + // Conservatively set the pendingConfIndex to the last index in the + // log. There may or may not be a pending config change, but it's + // safe to delay any future proposals until we commit all our + // pending log entries, and scanning the entire tail of the log + // could be expensive. + if len(ents) > 0 { + r.pendingConfIndex = ents[len(ents)-1].Index } r.appendEntry(pb.Entry{Data: nil}) @@ -902,11 +914,13 @@ func stepLeader(r *raft, m pb.Message) { for i, e := range m.Entries { if e.Type == pb.EntryConfChange { - if r.pendingConf { - r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String()) + if r.pendingConfIndex > r.raftLog.applied { + r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", + e.String(), r.pendingConfIndex, r.raftLog.applied) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} + } else { + r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 } - r.pendingConf = true } } r.appendEntry(m.Entries...) @@ -1271,7 +1285,6 @@ func (r *raft) addLearner(id uint64) { } func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - r.pendingConf = false pr := r.getProgress(id) if pr == nil { r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) @@ -1307,7 +1320,6 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { func (r *raft) removeNode(id uint64) { r.delProgress(id) - r.pendingConf = false // do not try to commit or abort transferring if there is no nodes in the cluster. if len(r.prs) == 0 && len(r.learnerPrs) == 0 { @@ -1325,8 +1337,6 @@ func (r *raft) removeNode(id uint64) { } } -func (r *raft) resetPendingConf() { r.pendingConf = false } - func (r *raft) setProgress(id, match, next uint64, isLearner bool) { if !isLearner { delete(r.learnerPrs, id) diff --git a/raft/raft_test.go b/raft/raft_test.go index c1416d05150a..858fb4a1c53c 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -2475,8 +2475,12 @@ func TestRestoreWithLearner(t *testing.T) { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } sg := sm.nodes() - if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) { - t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState) + if len(sg) != len(s.Metadata.ConfState.Nodes) { + t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes) + } + lns := sm.learnerNodes() + if len(lns) != len(s.Metadata.ConfState.Learners) { + t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { if sm.prs[n].IsLearner { @@ -2736,8 +2740,8 @@ func TestStepConfig(t *testing.T) { if g := r.raftLog.lastIndex(); g != index+1 { t.Errorf("index = %d, want %d", g, index+1) } - if !r.pendingConf { - t.Errorf("pendingConf = %v, want true", r.pendingConf) + if r.pendingConfIndex != index+1 { + t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1) } } @@ -2751,7 +2755,7 @@ func TestStepIgnoreConfig(t *testing.T) { r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) index := r.raftLog.lastIndex() - pendingConf := r.pendingConf + pendingConfIndex := r.pendingConfIndex r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} ents, err := r.raftLog.entries(index+1, noLimit) @@ -2761,57 +2765,39 @@ func TestStepIgnoreConfig(t *testing.T) { if !reflect.DeepEqual(ents, wents) { t.Errorf("ents = %+v, want %+v", ents, wents) } - if r.pendingConf != pendingConf { - t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) + if r.pendingConfIndex != pendingConfIndex { + t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex) } } -// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag +// TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex // based on uncommitted entries. -func TestRecoverPendingConfig(t *testing.T) { +func TestNewLeaderPendingConfig(t *testing.T) { tests := []struct { - entType pb.EntryType - wpending bool + addEntry bool + wpendingIndex uint64 }{ - {pb.EntryNormal, false}, - {pb.EntryConfChange, true}, + {false, 0}, + {true, 1}, } for i, tt := range tests { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.appendEntry(pb.Entry{Type: tt.entType}) + if tt.addEntry { + r.appendEntry(pb.Entry{Type: pb.EntryNormal}) + } r.becomeCandidate() r.becomeLeader() - if r.pendingConf != tt.wpending { - t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) + if r.pendingConfIndex != tt.wpendingIndex { + t.Errorf("#%d: pendingConfIndex = %d, want %d", + i, r.pendingConfIndex, tt.wpendingIndex) } } } -// TestRecoverDoublePendingConfig tests that new leader will panic if -// there exist two uncommitted config entries. -func TestRecoverDoublePendingConfig(t *testing.T) { - func() { - defer func() { - if err := recover(); err == nil { - t.Errorf("expect panic, but nothing happens") - } - }() - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) - r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) - r.becomeCandidate() - r.becomeLeader() - }() -} - -// TestAddNode tests that addNode could update pendingConf and nodes correctly. +// TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.addNode(2) - if r.pendingConf { - t.Errorf("pendingConf = %v, want false", r.pendingConf) - } nodes := r.nodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -2819,16 +2805,12 @@ func TestAddNode(t *testing.T) { } } -// TestAddLearner tests that addLearner could update pendingConf and nodes correctly. +// TestAddLearner tests that addLearner could update nodes correctly. func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.addLearner(2) - if r.pendingConf { - t.Errorf("pendingConf = %v, want false", r.pendingConf) - } - nodes := r.nodes() - wnodes := []uint64{1, 2} + nodes := r.learnerNodes() + wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } @@ -2841,7 +2823,6 @@ func TestAddLearner(t *testing.T) { // immediately when checkQuorum is set. func TestAddNodeCheckQuorum(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.checkQuorum = true r.becomeCandidate() @@ -2872,15 +2853,11 @@ func TestAddNodeCheckQuorum(t *testing.T) { } } -// TestRemoveNode tests that removeNode could update pendingConf, nodes and +// TestRemoveNode tests that removeNode could update nodes and // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.removeNode(2) - if r.pendingConf { - t.Errorf("pendingConf = %v, want false", r.pendingConf) - } w := []uint64{1} if g := r.nodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -2894,23 +2871,23 @@ func TestRemoveNode(t *testing.T) { } } -// TestRemoveLearner tests that removeNode could update pendingConf, nodes and +// TestRemoveLearner tests that removeNode could update nodes and // and removed list correctly. func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.removeNode(2) - if r.pendingConf { - t.Errorf("pendingConf = %v, want false", r.pendingConf) - } w := []uint64{1} if g := r.nodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } + w = []uint64{} + if g := r.learnerNodes(); !reflect.DeepEqual(g, w) { + t.Errorf("nodes = %v, want %v", g, w) + } + // remove all nodes from cluster r.removeNode(1) - w = []uint64{} if g := r.nodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } diff --git a/raft/rawnode.go b/raft/rawnode.go index 925cb851c4ad..fbd7a49e8547 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -169,8 +169,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { if cc.NodeID == None { - rn.raft.resetPendingConf() - return &pb.ConfState{Nodes: rn.raft.nodes()} + return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} } switch cc.Type { case pb.ConfChangeAddNode: @@ -180,11 +179,10 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { case pb.ConfChangeRemoveNode: rn.raft.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: - rn.raft.resetPendingConf() default: panic("unexpected conf type") } - return &pb.ConfState{Nodes: rn.raft.nodes()} + return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} } // Step advances the state machine using the given message. diff --git a/scripts/genproto.sh b/scripts/genproto.sh index d95729380e79..004079570e80 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -20,7 +20,7 @@ DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/m # exact version of packages to build GOGO_PROTO_SHA="41168f6614b7bb144818ec8967b8c702705df564" -GRPC_GATEWAY_SHA="a92d37fb6339375fa4bb7d9c364f92373fe199c3" +GRPC_GATEWAY_SHA="61c34cc7e0c7a0d85e4237d665e622640279ff3d" SCHWAG_SHA="b7d0fc9aadaaae3d61aaadfc12e4a2f945514912" # set up self-contained GOPATH for building diff --git a/test b/test index 9c16bbc2e797..01dfe64aa454 100755 --- a/test +++ b/test @@ -133,6 +133,9 @@ function functional_pass { -peer-ports 12380,22380,32380 \ -limit 1 \ -schedule-cases "0 1 2 3 4 5" \ + -stress-qps 1000 \ + -stress-key-txn-count 100 \ + -stress-key-txn-ops 10 \ -exit-on-failure && echo "'etcd-tester' succeeded" ETCD_TESTER_EXIT_CODE=$? echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE} diff --git a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go index 2df7c1c17100..85a022626e8a 100644 --- a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go +++ b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go @@ -68,8 +68,7 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) { for { lk, err = c.Lease.KeepAliveOnce(ctx, l.ID) - ev, _ := status.FromError(err) - if ev.Code() == codes.NotFound { + if ev, ok := status.FromError(err); ok && ev.Code() == codes.NotFound { if time.Since(expire) < 0 { log.Fatalf("bad renew! exceeded: %v", time.Since(expire)) for { diff --git a/tools/functional-tester/etcd-tester/key_stresser.go b/tools/functional-tester/etcd-tester/key_stresser.go index 3b29fb199597..f4ab77906531 100644 --- a/tools/functional-tester/etcd-tester/key_stresser.go +++ b/tools/functional-tester/etcd-tester/key_stresser.go @@ -34,9 +34,11 @@ import ( type keyStresser struct { Endpoint string - keyLargeSize int - keySize int - keySuffixRange int + keyLargeSize int + keySize int + keySuffixRange int + keyTxnSuffixRange int + keyTxnOps int N int @@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error { {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)}, } + if s.keyTxnSuffixRange > 0 { + // adjust to make up ±70% of workloads with writes + stressEntries[0].weight = 0.24 + stressEntries[1].weight = 0.24 + stressEntries = append(stressEntries, stressEntry{ + weight: 0.24, + f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps), + }) + } s.stressTable = createStressTable(stressEntries) for i := 0; i < s.N; i++ { @@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { } } +func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc { + keys := make([]string, keyTxnSuffixRange) + for i := range keys { + keys[i] = fmt.Sprintf("/k%03d", i) + } + return writeTxn(kvc, keys, txnOps) +} + +func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc { + return func(ctx context.Context) (error, int64) { + ks := make(map[string]struct{}, txnOps) + for len(ks) != txnOps { + ks[keys[rand.Intn(64)]] = struct{}{} + } + selected := make([]string, 0, txnOps) + for k := range ks { + selected = append(selected, k) + } + com, delOp, putOp := getTxnReqs(selected[0], "bar00") + txnReq := &pb.TxnRequest{ + Compare: []*pb.Compare{com}, + Success: []*pb.RequestOp{delOp}, + Failure: []*pb.RequestOp{putOp}, + } + + // add nested txns if any + for i := 1; i < txnOps; i++ { + k, v := selected[i], fmt.Sprintf("bar%02d", i) + com, delOp, putOp = getTxnReqs(k, v) + nested := &pb.RequestOp{ + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Compare: []*pb.Compare{com}, + Success: []*pb.RequestOp{delOp}, + Failure: []*pb.RequestOp{putOp}, + }, + }, + } + txnReq.Success = append(txnReq.Success, nested) + txnReq.Failure = append(txnReq.Failure, nested) + } + + _, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false)) + return err, int64(txnOps) + } +} + +func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) { + // if key exists (version > 0) + com = &pb.Compare{ + Key: []byte(key), + Target: pb.Compare_VERSION, + Result: pb.Compare_GREATER, + TargetUnion: &pb.Compare_Version{Version: 0}, + } + delOp = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{ + Key: []byte(key), + }, + }, + } + putOp = &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(key), + Value: []byte(val), + }, + }, + } + return com, delOp, putOp +} + func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { return func(ctx context.Context) (error, int64) { _, err := kvc.Range(ctx, &pb.RangeRequest{ diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index c0f77c2f1b3f..0197b1a955c4 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -47,6 +47,8 @@ func main() { stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.") stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.") stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.") + stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).") + stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).") limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure") stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.") @@ -120,15 +122,23 @@ func main() { } scfg := stressConfig{ - rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS), - keyLargeSize: int(*stressKeyLargeSize), - keySize: int(*stressKeySize), - keySuffixRange: int(*stressKeySuffixRange), - numLeases: 10, - keysPerLease: 10, + rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS), + keyLargeSize: int(*stressKeyLargeSize), + keySize: int(*stressKeySize), + keySuffixRange: int(*stressKeySuffixRange), + keyTxnSuffixRange: int(*stressKeyTxnSuffixRange), + keyTxnOps: int(*stressKeyTxnOps), + numLeases: 10, + keysPerLease: 10, etcdRunnerPath: *etcdRunnerPath, } + if scfg.keyTxnSuffixRange > 100 { + plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange) + } + if scfg.keyTxnOps > 64 { + plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps) + } t := &tester{ failures: schedule, diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index f9ab3f9fbc65..bf0d88214005 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -113,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker { } type stressConfig struct { - keyLargeSize int - keySize int - keySuffixRange int + keyLargeSize int + keySize int + keySuffixRange int + keyTxnSuffixRange int + keyTxnOps int numLeases int keysPerLease int @@ -142,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser { // TODO: Too intensive stressers can panic etcd member with // 'out of memory' error. Put rate limits in server side. return &keyStresser{ - Endpoint: m.grpcAddr(), - keyLargeSize: sc.keyLargeSize, - keySize: sc.keySize, - keySuffixRange: sc.keySuffixRange, - N: 100, - rateLimiter: sc.rateLimiter, + Endpoint: m.grpcAddr(), + keyLargeSize: sc.keyLargeSize, + keySize: sc.keySize, + keySuffixRange: sc.keySuffixRange, + keyTxnSuffixRange: sc.keyTxnSuffixRange, + keyTxnOps: sc.keyTxnOps, + N: 100, + rateLimiter: sc.rateLimiter, } case "v2keys": return &v2Stresser{