Skip to content

Commit

Permalink
etcdserver: remove v2 version set; e2e: fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoyinZyc committed Dec 9, 2019
1 parent ed5a01a commit 7784ca8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 43 deletions.
9 changes: 7 additions & 2 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
defer c.Unlock()

c.members, c.removed = membersFromStore(c.lg, c.v2store)
c.version = clusterVersionFromBackend(c.lg, c.be)
if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
}

mustDetectDowngrade(c.lg, c.version)
onSet(c.lg, c.version)

Expand Down Expand Up @@ -766,7 +771,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
if lg != nil {
lg.Panic(
"unexpected number of keys when getting cluster version from backend",
zap.Int("number fo keys", len(keys)),
zap.Int("number-of-key", len(keys)),
)
}
}
Expand Down
6 changes: 1 addition & 5 deletions etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import (
"path"
"time"

"go.etcd.io/etcd/etcdserver/api"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/pkg/pbutil"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -91,10 +89,8 @@ func (a *applierV2store) Put(r *RequestV2) Response {
// return an empty response since there is no consumer.
return Response{}
}
// remove v2 version set to avoid the conflict between v2 and v3.
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
}
// return an empty response since there is no consumer.
return Response{}
}
Expand Down
53 changes: 24 additions & 29 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func (s *EtcdServer) adjustTicks() {
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.goAttach(s.monitorVersions)
Expand Down Expand Up @@ -1997,6 +1997,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// with the static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
// TODO: replace publish() in 3.6
func (s *EtcdServer) publishV3(timeout time.Duration) {
req := &membershippb.ClusterMemberAttrSetRequest{
Member_ID: uint64(s.id),
Expand All @@ -2005,18 +2006,16 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
ClientUrls: s.attributes.ClientURLs,
},
}

lg := s.getLogger()
for {
select {
case <-s.stopping:
if lg := s.getLogger(); lg != nil {
lg.Warn(
"stopped publish because server is stopping",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
)
}
lg.Warn(
"stopped publish because server is stopping",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
)
return

default:
Expand All @@ -2028,27 +2027,23 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
switch err {
case nil:
close(s.readych)
if lg := s.getLogger(); lg != nil {
lg.Info(
"published local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("cluster-id", s.cluster.ID().String()),
zap.Duration("publish-timeout", timeout),
)
}
lg.Info(
"published local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("cluster-id", s.cluster.ID().String()),
zap.Duration("publish-timeout", timeout),
)
return

default:
if lg := s.getLogger(); lg != nil {
lg.Warn(
"failed to publish local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
}
lg.Warn(
"failed to publish local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
}
}
}
Expand All @@ -2063,7 +2058,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
// but does not go through v2 API endpoint, which means even with v2
// client handler disabled (e.g. --enable-v2=false), cluster can still
// process publish requests through rafthttp
// TODO: Deprecate v2 store
// TODO: Deprecate v2 store in 3.6
func (s *EtcdServer) publish(timeout time.Duration) {
b, err := json.Marshal(s.attributes)
if err != nil {
Expand Down
23 changes: 16 additions & 7 deletions tests/e2e/ctl_v3_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
func TestCtlV3Migrate(t *testing.T) {
defer testutil.AfterTest(t)

epc := setupEtcdctlTest(t, &configNoTLS, false)
cfg := configNoTLS
cfg.enableV2 = true
epc := setupEtcdctlTest(t, &cfg, false)
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
Expand Down Expand Up @@ -69,10 +71,6 @@ func TestCtlV3Migrate(t *testing.T) {
t.Fatal(err)
}

// to ensure revision increment is continuous from migrated v2 data
if err := ctlV3Put(cx, "test", "value", ""); err != nil {
t.Fatal(err)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: epc.EndpointsV3(),
DialTimeout: 3 * time.Second,
Expand All @@ -85,11 +83,22 @@ func TestCtlV3Migrate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
revAfterMigrate := resp.Header.Revision
// to ensure revision increment is continuous from migrated v2 data
if err := ctlV3Put(cx, "test", "value", ""); err != nil {
t.Fatal(err)
}

resp, err = cli.Get(context.TODO(), "test")
if err != nil {
t.Fatal(err)
}
if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
}
if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)

if resp.Kvs[0].CreateRevision != revAfterMigrate+1 {
t.Fatalf("expected revision increment is continuous from migrated v2, got %d", resp.Kvs[0].CreateRevision)
}
}

Expand Down

0 comments on commit 7784ca8

Please sign in to comment.