diff --git a/CHANGELOG-3.4.md b/CHANGELOG-3.4.md index 5ac64beba39..e5d5253de48 100644 --- a/CHANGELOG-3.4.md +++ b/CHANGELOG-3.4.md @@ -19,6 +19,7 @@ - Move `cmd/vendor` directory to `vendor` at repository root. - Remove recursive symlinks in `cmd` directory. - Now `go get/install/build` on `etcd` packages (e.g. `clientv3`, `tools/benchmark`) enforce builds with etcd `vendor` directory. +- Rename `"github.com/coreos/etcd/snap"` to [`"github.com/coreos/etcd/internal/raftsnap"`](https://github.com/coreos/etcd/pull/9211). ### Added(API) diff --git a/contrib/raftexample/kvstore.go b/contrib/raftexample/kvstore.go index d877bc7ad5e..077d4959fc0 100644 --- a/contrib/raftexample/kvstore.go +++ b/contrib/raftexample/kvstore.go @@ -21,7 +21,7 @@ import ( "log" "sync" - "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/internal/raftsnap" ) // a key-value store backed by raft @@ -29,7 +29,7 @@ type kvstore struct { proposeC chan<- string // channel for proposing updates mu sync.RWMutex kvStore map[string]string // current committed key-value pairs - snapshotter *snap.Snapshotter + snapshotter *raftsnap.Snapshotter } type kv struct { @@ -37,7 +37,7 @@ type kv struct { Val string } -func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { +func newKVStore(snapshotter *raftsnap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} // replay log into key-value map s.readCommits(commitC, errorC) @@ -67,10 +67,10 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { // done replaying log; new data incoming // OR signaled to load snapshot snapshot, err := s.snapshotter.Load() - if err == snap.ErrNoSnapshot { + if err == raftsnap.ErrNoSnapshot { return } - if err != nil && err != snap.ErrNoSnapshot { + if err != nil && err != raftsnap.ErrNoSnapshot { log.Panic(err) } log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 2b7f3207a90..c53c6f1f58a 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -25,12 +25,12 @@ import ( "time" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" ) @@ -59,8 +59,8 @@ type raftNode struct { raftStorage *raft.MemoryStorage wal *wal.WAL - snapshotter *snap.Snapshotter - snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready + snapshotter *raftsnap.Snapshotter + snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready snapCount uint64 transport *rafthttp.Transport @@ -77,7 +77,7 @@ var defaultSnapCount uint64 = 10000 // commit channel, followed by a nil message (to indicate the channel is // current), then new log entries. To shutdown, close proposeC and read errorC. func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, - confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { + confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *raftsnap.Snapshotter) { commitC := make(chan *string) errorC := make(chan error) @@ -98,7 +98,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), - snapshotterReady: make(chan *snap.Snapshotter, 1), + snapshotterReady: make(chan *raftsnap.Snapshotter, 1), // rest of structure populated after WAL replay } go rc.startRaft() @@ -188,7 +188,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { snapshot, err := rc.snapshotter.Load() - if err != nil && err != snap.ErrNoSnapshot { + if err != nil && err != raftsnap.ErrNoSnapshot { log.Fatalf("raftexample: error loading snapshot (%v)", err) } return snapshot @@ -261,7 +261,7 @@ func (rc *raftNode) startRaft() { log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) } } - rc.snapshotter = snap.New(rc.snapdir) + rc.snapshotter = raftsnap.New(rc.snapdir) rc.snapshotterReady <- rc.snapshotter oldwal := wal.Exist(rc.waldir) diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index 82e61d8ab29..0a8714cd24f 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -25,11 +25,11 @@ import ( "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" @@ -102,14 +102,14 @@ func handleBackup(c *cli.Context) error { } func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { - ss := snap.New(srcSnap) + ss := raftsnap.New(srcSnap) snapshot, err := ss.Load() - if err != nil && err != snap.ErrNoSnapshot { + if err != nil && err != raftsnap.ErrNoSnapshot { log.Fatal(err) } if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term - newss := snap.New(destSnap) + newss := raftsnap.New(destSnap) if err = newss.SaveSnap(*snapshot); err != nil { log.Fatal(err) } diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index 48c17c8ab4a..f9e28ebad2c 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -30,13 +30,13 @@ import ( "github.com/coreos/etcd/etcdserver/api" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" @@ -134,9 +134,9 @@ func rebuildStoreV2() (store.Store, uint64) { } snapdir := filepath.Join(migrateDatadir, "member", "snap") - ss := snap.New(snapdir) + ss := raftsnap.New(snapdir) snapshot, err := ss.Load() - if err != nil && err != snap.ErrNoSnapshot { + if err != nil && err != raftsnap.ErrNoSnapshot { ExitWithError(ExitError, err) } diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 01a84d04d48..b66c549a166 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -19,11 +19,11 @@ import ( "os" "time" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" ) func newBackend(cfg ServerConfig) backend.Backend { @@ -37,7 +37,7 @@ func newBackend(cfg ServerConfig) backend.Backend { } // openSnapshotBackend renames a snapshot db to the current etcd db and opens it. -func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { +func openSnapshotBackend(cfg ServerConfig, ss *raftsnap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) if err != nil { return nil, fmt.Errorf("database snapshot file path error: %v", err) @@ -77,5 +77,5 @@ func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot ra return oldbe, nil } oldbe.Close() - return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot) + return openSnapshotBackend(cfg, raftsnap.New(cfg.SnapDir()), snapshot) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 780611f70fa..4bf6eded8e1 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -38,6 +38,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/mvcc" @@ -52,7 +53,6 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/version" "github.com/coreos/etcd/wal" @@ -206,7 +206,7 @@ type EtcdServer struct { cluster *membership.RaftCluster store store.Store - snapshotter *snap.Snapshotter + snapshotter *raftsnap.Snapshotter applyV2 ApplierV2 @@ -279,7 +279,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil { plog.Fatalf("create snapshot directory error: %v", err) } - ss := snap.New(cfg.SnapDir()) + ss := raftsnap.New(cfg.SnapDir()) bepath := cfg.backendPath() beExist := fileutil.Exist(bepath) @@ -373,7 +373,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } snapshot, err = ss.Load() - if err != nil && err != snap.ErrNoSnapshot { + if err != nil && err != raftsnap.ErrNoSnapshot { return nil, err } if snapshot != nil { @@ -1266,7 +1266,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { } } -func (s *EtcdServer) sendMergedSnap(merged snap.Message) { +func (s *EtcdServer) sendMergedSnap(merged raftsnap.Message) { atomic.AddInt64(&s.inflightSnapshots, 1) s.r.transport.SendSnapshot(merged) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e3ea0f9250c..6ae4f6d108b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -28,6 +28,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" @@ -43,7 +44,6 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" ) @@ -986,7 +986,7 @@ func TestSnapshotOrdering(t *testing.T) { Cfg: ServerConfig{DataDir: testdir}, r: *r, store: st, - snapshotter: snap.New(snapdir), + snapshotter: raftsnap.New(snapdir), cluster: cl, SyncTicker: &time.Ticker{}, } @@ -1111,7 +1111,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { Cfg: ServerConfig{DataDir: testdir}, r: *r, store: st, - snapshotter: snap.New(testdir), + snapshotter: raftsnap.New(testdir), cluster: cl, SyncTicker: &time.Ticker{}, } diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index 928aa95b6b1..b238b4c55da 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -17,15 +17,15 @@ package etcdserver import ( "io" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" ) // createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf), // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message // as ReadCloser. -func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message { +func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) raftsnap.Message { // get a snapshot of v2 store as []byte clone := s.store.Clone() d, err := clone.SaveNoCopy() @@ -51,7 +51,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi } m.Snapshot = snapshot - return *snap.NewMessage(m, rc, dbsnap.Size()) + return *raftsnap.NewMessage(m, rc, dbsnap.Size()) } func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser { diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 55c2dd4b6a4..b567c518ce0 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -18,10 +18,10 @@ import ( "io" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" ) @@ -38,10 +38,10 @@ type Storage interface { type storage struct { *wal.WAL - *snap.Snapshotter + *raftsnap.Snapshotter } -func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { +func NewStorage(w *wal.WAL, s *raftsnap.Snapshotter) Storage { return &storage{w, s} } diff --git a/etcdserver/util_test.go b/etcdserver/util_test.go index 79edabd1220..7c07bcd19e1 100644 --- a/etcdserver/util_test.go +++ b/etcdserver/util_test.go @@ -20,10 +20,10 @@ import ( "time" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/etcd/snap" ) func TestLongestConnected(t *testing.T) { @@ -76,7 +76,7 @@ func newNopTransporterWithActiveTime(memberIDs []types.ID) rafthttp.Transporter func (s *nopTransporterWithActiveTime) Start() error { return nil } func (s *nopTransporterWithActiveTime) Handler() http.Handler { return nil } func (s *nopTransporterWithActiveTime) Send(m []raftpb.Message) {} -func (s *nopTransporterWithActiveTime) SendSnapshot(m snap.Message) {} +func (s *nopTransporterWithActiveTime) SendSnapshot(m raftsnap.Message) {} func (s *nopTransporterWithActiveTime) AddRemote(id types.ID, us []string) {} func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {} func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {} diff --git a/snap/db.go b/internal/raftsnap/db.go similarity index 99% rename from snap/db.go rename to internal/raftsnap/db.go index 01d897ae861..cf9ffccb05d 100644 --- a/snap/db.go +++ b/internal/raftsnap/db.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package snap +package raftsnap import ( "errors" diff --git a/internal/raftsnap/doc.go b/internal/raftsnap/doc.go new file mode 100644 index 00000000000..68026689e79 --- /dev/null +++ b/internal/raftsnap/doc.go @@ -0,0 +1,17 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package raftsnap handles Raft nodes' states with snapshots. +// The snapshot logic is internal to etcd server and raft package. +package raftsnap diff --git a/snap/message.go b/internal/raftsnap/message.go similarity index 99% rename from snap/message.go rename to internal/raftsnap/message.go index d73713ff169..3826b2cb2e1 100644 --- a/snap/message.go +++ b/internal/raftsnap/message.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package snap +package raftsnap import ( "io" diff --git a/snap/metrics.go b/internal/raftsnap/metrics.go similarity index 98% rename from snap/metrics.go rename to internal/raftsnap/metrics.go index 433ef09d4ba..9ed7fa871eb 100644 --- a/snap/metrics.go +++ b/internal/raftsnap/metrics.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package snap +package raftsnap import "github.com/prometheus/client_golang/prometheus" diff --git a/snap/snappb/snap.pb.go b/internal/raftsnap/snappb/snap.pb.go similarity index 100% rename from snap/snappb/snap.pb.go rename to internal/raftsnap/snappb/snap.pb.go diff --git a/snap/snappb/snap.proto b/internal/raftsnap/snappb/snap.proto similarity index 100% rename from snap/snappb/snap.proto rename to internal/raftsnap/snappb/snap.proto diff --git a/snap/snapshotter.go b/internal/raftsnap/snapshotter.go similarity index 97% rename from snap/snapshotter.go rename to internal/raftsnap/snapshotter.go index 00755592129..7ebd6fd3efc 100644 --- a/snap/snapshotter.go +++ b/internal/raftsnap/snapshotter.go @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package snap stores raft nodes' states with snapshots. -package snap +package raftsnap import ( "errors" @@ -26,11 +25,11 @@ import ( "strings" "time" + "github.com/coreos/etcd/internal/raftsnap/snappb" pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap/snappb" "github.com/coreos/pkg/capnslog" ) diff --git a/snap/snapshotter_test.go b/internal/raftsnap/snapshotter_test.go similarity index 99% rename from snap/snapshotter_test.go rename to internal/raftsnap/snapshotter_test.go index 6af823f03bf..368154e2314 100644 --- a/snap/snapshotter_test.go +++ b/internal/raftsnap/snapshotter_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package snap +package raftsnap import ( "fmt" diff --git a/rafthttp/http.go b/rafthttp/http.go index cc89e171ee2..c3c095f7cb3 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -23,10 +23,10 @@ import ( "path" "strings" + "github.com/coreos/etcd/internal/raftsnap" pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/version" ) @@ -136,11 +136,11 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type snapshotHandler struct { tr Transporter r Raft - snapshotter *snap.Snapshotter + snapshotter *raftsnap.Snapshotter cid types.ID } -func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler { +func newSnapshotHandler(tr Transporter, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler { return &snapshotHandler{ tr: tr, r: r, diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index e7f6e034f1e..44622c8143a 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -26,10 +26,10 @@ import ( "testing" "time" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/version" ) @@ -356,7 +356,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { msgs []raftpb.Message - snapMsgs []snap.Message + snapMsgs []raftsnap.Message peerURLs types.URLs connc chan *outgoingConn paused bool @@ -377,7 +377,7 @@ func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } -func (pr *fakePeer) sendSnap(m snap.Message) { +func (pr *fakePeer) sendSnap(m raftsnap.Message) { if pr.paused { return } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 9aee4dbc919..7cf64a0247b 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -20,10 +20,10 @@ import ( "time" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "golang.org/x/time/rate" ) @@ -63,7 +63,7 @@ type Peer interface { // sendSnap sends the merged snapshot message to the remote peer. Its behavior // is similar to send. - sendSnap(m snap.Message) + sendSnap(m raftsnap.Message) // update updates the urls of remote peer. update(urls types.URLs) @@ -233,7 +233,7 @@ func (p *peer) send(m raftpb.Message) { } } -func (p *peer) sendSnap(m snap.Message) { +func (p *peer) sendSnap(m raftsnap.Message) { go p.snapSender.send(m) } diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 52273c9d195..342a79a1c86 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -22,11 +22,11 @@ import ( "net/http" "time" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/httputil" pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/snap" ) var ( @@ -63,7 +63,7 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe func (s *snapshotSender) stop() { close(s.stopc) } -func (s *snapshotSender) send(merged snap.Message) { +func (s *snapshotSender) send(merged raftsnap.Message) { m := merged.Message body := createSnapBody(merged) @@ -142,7 +142,7 @@ func (s *snapshotSender) post(req *http.Request) (err error) { } } -func createSnapBody(merged snap.Message) io.ReadCloser { +func createSnapBody(merged raftsnap.Message) io.ReadCloser { buf := new(bytes.Buffer) enc := &messageEncoder{w: buf} // encode raft message diff --git a/rafthttp/snapshot_test.go b/rafthttp/snapshot_test.go index fbf482d0999..ed8220fad64 100644 --- a/rafthttp/snapshot_test.go +++ b/rafthttp/snapshot_test.go @@ -25,9 +25,9 @@ import ( "testing" "time" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" ) type strReaderCloser struct{ *strings.Reader } @@ -82,7 +82,7 @@ func TestSnapshotSend(t *testing.T) { } for i, tt := range tests { - sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size)) + sent, files := testSnapshotSend(t, raftsnap.NewMessage(tt.m, tt.rc, tt.size)) if tt.wsent != sent { t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent) } @@ -92,7 +92,7 @@ func TestSnapshotSend(t *testing.T) { } } -func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) { +func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) { d, err := ioutil.TempDir(os.TempDir(), "snapdir") if err != nil { t.Fatal(err) @@ -102,7 +102,7 @@ func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) { r := &fakeRaft{} tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r} ch := make(chan struct{}, 1) - h := &syncHandler{newSnapshotHandler(tr, r, snap.New(d), types.ID(1)), ch} + h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(d), types.ID(1)), ch} srv := httptest.NewServer(h) defer srv.Close() diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 51574ba5eda..6a127d972d1 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -21,12 +21,12 @@ import ( "time" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/pkg/capnslog" "github.com/xiang90/probing" @@ -60,7 +60,7 @@ type Transporter interface { Send(m []raftpb.Message) // SendSnapshot sends out the given snapshot message to a remote peer. // The behavior of SendSnapshot is similar to Send. - SendSnapshot(m snap.Message) + SendSnapshot(m raftsnap.Message) // AddRemote adds a remote with given peer urls into the transport. // A remote helps newly joined member to catch up the progress of cluster, // and will not be used after that. @@ -107,7 +107,7 @@ type Transport struct { URLs types.URLs // local peer URLs ClusterID types.ID // raft cluster ID for request validation Raft Raft // raft state machine, to which the Transport forwards received messages and reports status - Snapshotter *snap.Snapshotter + Snapshotter *raftsnap.Snapshotter ServerStats *stats.ServerStats // used to record general transportation statistics // used to record transportation statistics with followers when // performing as leader in raft protocol @@ -346,7 +346,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time { return time.Time{} } -func (t *Transport) SendSnapshot(m snap.Message) { +func (t *Transport) SendSnapshot(m raftsnap.Message) { t.mu.Lock() defer t.mu.Unlock() p := t.peers[types.ID(m.To)] @@ -384,7 +384,7 @@ func NewNopTransporter() Transporter { func (s *nopTransporter) Start() error { return nil } func (s *nopTransporter) Handler() http.Handler { return nil } func (s *nopTransporter) Send(m []raftpb.Message) {} -func (s *nopTransporter) SendSnapshot(m snap.Message) {} +func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {} func (s *nopTransporter) AddRemote(id types.ID, us []string) {} func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} @@ -397,18 +397,18 @@ func (s *nopTransporter) Resume() {} type snapTransporter struct { nopTransporter - snapDoneC chan snap.Message + snapDoneC chan raftsnap.Message snapDir string } -func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) { - ch := make(chan snap.Message, 1) +func NewSnapTransporter(snapDir string) (Transporter, <-chan raftsnap.Message) { + ch := make(chan raftsnap.Message, 1) tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir} return tr, ch } -func (s *snapTransporter) SendSnapshot(m snap.Message) { - ss := snap.New(s.snapDir) +func (s *snapTransporter) SendSnapshot(m raftsnap.Message) { + ss := raftsnap.New(s.snapDir) ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) m.CloseWithError(nil) s.snapDoneC <- m diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 004079570e8..253c5a6c4aa 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -16,7 +16,7 @@ if [[ $(protoc --version | cut -f2 -d' ') != "3.5.1" ]]; then fi # directories containing protos to be built -DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb" +DIRS="./wal/walpb ./etcdserver/etcdserverpb ./raftsnap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb" # exact version of packages to build GOGO_PROTO_SHA="41168f6614b7bb144818ec8967b8c702705df564" diff --git a/snapshot/v3_snapshot.go b/snapshot/v3_snapshot.go index fdde5dace55..045f95312b0 100644 --- a/snapshot/v3_snapshot.go +++ b/snapshot/v3_snapshot.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" @@ -38,7 +39,6 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" @@ -420,7 +420,7 @@ func (s *v3Manager) saveWALAndSnap() error { }, }, } - sn := snap.New(s.snapDir) + sn := raftsnap.New(s.snapDir) if err := sn.SaveSnap(raftSnap); err != nil { return err } diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index fb72be10050..e586d1bb413 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -22,10 +22,10 @@ import ( "time" "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" ) @@ -57,10 +57,10 @@ func main() { walsnap.Index = *index } else { if *snapfile == "" { - ss := snap.New(snapDir(dataDir)) + ss := raftsnap.New(snapDir(dataDir)) snapshot, err = ss.Load() } else { - snapshot, err = snap.Read(filepath.Join(snapDir(dataDir), *snapfile)) + snapshot, err = raftsnap.Read(filepath.Join(snapDir(dataDir), *snapfile)) } switch err { @@ -69,7 +69,7 @@ func main() { nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes) fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n", walsnap.Term, walsnap.Index, nodes) - case snap.ErrNoSnapshot: + case raftsnap.ErrNoSnapshot: fmt.Printf("Snapshot:\nempty\n") default: log.Fatalf("Failed loading snapshot: %v", err)