diff --git a/etcd_test.go b/etcd_test.go index d868be89667..95e4c3c525e 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -32,7 +32,6 @@ func TestSingleNode(t *testing.T) { time.Sleep(time.Second) - etcd.OpenDebug() c := etcd.NewClient() c.SyncCluster() diff --git a/server/join_command.go b/server/join_command.go index c8ad6760b3a..d1a0dcc4250 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -14,18 +14,18 @@ func init() { // The JoinCommand adds a node to the cluster. type JoinCommand struct { - RaftVersion string `json:"raftVersion"` - Name string `json:"name"` - RaftURL string `json:"raftURL"` - EtcdURL string `json:"etcdURL"` + RaftVersion string `json:"raftVersion"` + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` } func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand { return &JoinCommand{ - RaftVersion: version, - Name: name, - RaftURL: raftUrl, - EtcdURL: etcdUrl, + RaftVersion: version, + Name: name, + RaftURL: raftUrl, + EtcdURL: etcdUrl, } } diff --git a/server/peer_server.go b/server/peer_server.go index d016717b17f..0b16f98e8ef 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -1,363 +1,362 @@ package server import ( - "bytes" - "crypto/tls" - "encoding/binary" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "time" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" + "bytes" + "crypto/tls" + "encoding/binary" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) type PeerServer struct { - *raft.Server - server *Server - joinIndex uint64 - name string - url string - listenHost string - tlsConf *TLSConfig - tlsInfo *TLSInfo - followersStats *raftFollowersStats - serverStats *raftServerStats - registry *Registry - store *store.Store - snapConf *snapshotConf - MaxClusterSize int - RetryTimes int + *raft.Server + server *Server + joinIndex uint64 + name string + url string + listenHost string + tlsConf *TLSConfig + tlsInfo *TLSInfo + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry + store *store.Store + snapConf *snapshotConf + MaxClusterSize int + RetryTimes int } // TODO: find a good policy to do snapshot type snapshotConf struct { - // Etcd will check if snapshot is need every checkingInterval - checkingInterval time.Duration + // Etcd will check if snapshot is need every checkingInterval + checkingInterval time.Duration - // The number of writes when the last snapshot happened - lastWrites uint64 + // The number of writes when the last snapshot happened + lastWrites uint64 - // If the incremental number of writes since the last snapshot - // exceeds the write Threshold, etcd will do a snapshot - writesThr uint64 + // If the incremental number of writes since the last snapshot + // exceeds the write Threshold, etcd will do a snapshot + writesThr uint64 } - func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer { - s := &PeerServer{ - name: name, - url: url, - listenHost: listenHost, - tlsConf: tlsConf, - tlsInfo: tlsInfo, - registry: registry, - store: store, - snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000}, - followersStats: &raftFollowersStats{ - Leader: name, - Followers: make(map[string]*raftFollowerStats), - }, - serverStats: &raftServerStats{ - StartTime: time.Now(), - sendRateQueue: &statsQueue{ - back: -1, - }, - recvRateQueue: &statsQueue{ - back: -1, - }, - }, - } - - // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) - - // Create raft server - server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") - if err != nil { - log.Fatal(err) - } - - s.Server = server - - return s + s := &PeerServer{ + name: name, + url: url, + listenHost: listenHost, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + registry: registry, + store: store, + snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000}, + followersStats: &raftFollowersStats{ + Leader: name, + Followers: make(map[string]*raftFollowerStats), + }, + serverStats: &raftServerStats{ + StartTime: time.Now(), + sendRateQueue: &statsQueue{ + back: -1, + }, + recvRateQueue: &statsQueue{ + back: -1, + }, + }, + } + + // Create transporter for raft + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) + + // Create raft server + server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") + if err != nil { + log.Fatal(err) + } + + s.Server = server + + return s } // Start the raft server func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { - // LoadSnapshot - if snapshot { - err := s.LoadSnapshot() - - if err == nil { - log.Debugf("%s finished load snapshot", s.name) - } else { - log.Debug(err) - } - } - - s.SetElectionTimeout(ElectionTimeout) - s.SetHeartbeatTimeout(HeartbeatTimeout) - - s.Start() - - if s.IsLogEmpty() { - // start as a leader in a new cluster - if len(cluster) == 0 { - s.startAsLeader() - } else { - s.startAsFollower(cluster) - } - - } else { - // Rejoin the previous cluster - cluster = s.registry.PeerURLs(s.Leader(), s.name) - for i := 0; i < len(cluster); i++ { - u, err := url.Parse(cluster[i]) - if err != nil { - log.Debug("rejoin cannot parse url: ", err) - } - cluster[i] = u.Host - } - ok := s.joinCluster(cluster) - if !ok { - log.Warn("the entire cluster is down! this machine will restart the cluster.") - } - - log.Debugf("%s restart as a follower", s.name) - } - - // open the snapshot - if snapshot { - go s.monitorSnapshot() - } - - // start to response to raft requests - go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server) + // LoadSnapshot + if snapshot { + err := s.LoadSnapshot() + + if err == nil { + log.Debugf("%s finished load snapshot", s.name) + } else { + log.Debug(err) + } + } + + s.SetElectionTimeout(ElectionTimeout) + s.SetHeartbeatTimeout(HeartbeatTimeout) + + s.Start() + + if s.IsLogEmpty() { + // start as a leader in a new cluster + if len(cluster) == 0 { + s.startAsLeader() + } else { + s.startAsFollower(cluster) + } + + } else { + // Rejoin the previous cluster + cluster = s.registry.PeerURLs(s.Leader(), s.name) + for i := 0; i < len(cluster); i++ { + u, err := url.Parse(cluster[i]) + if err != nil { + log.Debug("rejoin cannot parse url: ", err) + } + cluster[i] = u.Host + } + ok := s.joinCluster(cluster) + if !ok { + log.Warn("the entire cluster is down! this machine will restart the cluster.") + } + + log.Debugf("%s restart as a follower", s.name) + } + + // open the snapshot + if snapshot { + go s.monitorSnapshot() + } + + // start to response to raft requests + go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server) } // Retrieves the underlying Raft server. func (s *PeerServer) RaftServer() *raft.Server { - return s.Server + return s.Server } // Associates the client server with the peer server. func (s *PeerServer) SetServer(server *Server) { - s.server = server + s.server = server } // Get all the current logs func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] GET %s/log", s.url) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(s.LogEntries()) + log.Debugf("[recv] GET %s/log", s.url) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(s.LogEntries()) } // Response to vote request func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { - rvreq := &raft.RequestVoteRequest{} - err := decodeJsonRequest(req, rvreq) - if err == nil { - log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName) - if resp := s.RequestVote(rvreq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - log.Warnf("[vote] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + rvreq := &raft.RequestVoteRequest{} + err := decodeJsonRequest(req, rvreq) + if err == nil { + log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName) + if resp := s.RequestVote(rvreq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + log.Warnf("[vote] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) } // Response to append entries request func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.AppendEntriesRequest{} - err := decodeJsonRequest(req, aereq) - - if err == nil { - log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries)) - - s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) - - if resp := s.AppendEntries(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - if !resp.Success { - log.Debugf("[Append Entry] Step back") - } - return - } - } - log.Warnf("[Append Entry] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + aereq := &raft.AppendEntriesRequest{} + err := decodeJsonRequest(req, aereq) + + if err == nil { + log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries)) + + s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) + + if resp := s.AppendEntries(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + if !resp.Success { + log.Debugf("[Append Entry] Step back") + } + return + } + } + log.Warnf("[Append Entry] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) } // Response to recover from snapshot request func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - log.Debugf("[recv] POST %s/snapshot/ ", s.url) - if resp := s.RequestSnapshot(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - log.Warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + aereq := &raft.SnapshotRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + log.Debugf("[recv] POST %s/snapshot/ ", s.url) + if resp := s.RequestSnapshot(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + log.Warnf("[Snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) } // Response to recover from snapshot request func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRecoveryRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url) - if resp := s.SnapshotRecoveryRequest(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - log.Warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + aereq := &raft.SnapshotRecoveryRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url) + if resp := s.SnapshotRecoveryRequest(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + log.Warnf("[Snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) } // Get the port that listening for etcd connecting of the server func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/etcdURL/ ", s.url) - w.WriteHeader(http.StatusOK) - w.Write([]byte(s.server.URL())) + log.Debugf("[recv] Get %s/etcdURL/ ", s.url) + w.WriteHeader(http.StatusOK) + w.Write([]byte(s.server.URL())) } // Response to the join request func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - command := &JoinCommand{} - - // Write CORS header. - if s.server.OriginAllowed("*") { - w.Header().Add("Access-Control-Allow-Origin", "*") - } else if s.server.OriginAllowed(req.Header.Get("Origin")) { - w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin")) - } - - err := decodeJsonRequest(req, command) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - log.Debugf("Receive Join Request from %s", command.Name) - err = s.dispatchRaftCommand(command, w, req) - - // Return status. - if err != nil { - if etcdErr, ok := err.(*etcdErr.Error); ok { - log.Debug("Return error: ", (*etcdErr).Error()) - etcdErr.Write(w) - } else { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } + command := &JoinCommand{} + + // Write CORS header. + if s.server.OriginAllowed("*") { + w.Header().Add("Access-Control-Allow-Origin", "*") + } else if s.server.OriginAllowed(req.Header.Get("Origin")) { + w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin")) + } + + err := decodeJsonRequest(req, command) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + log.Debugf("Receive Join Request from %s", command.Name) + err = s.dispatchRaftCommand(command, w, req) + + // Return status. + if err != nil { + if etcdErr, ok := err.(*etcdErr.Error); ok { + log.Debug("Return error: ", (*etcdErr).Error()) + etcdErr.Write(w) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } } // Response to remove request func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "DELETE" { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } + if req.Method != "DELETE" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } - nodeName := req.URL.Path[len("/remove/"):] - command := &RemoveCommand{ - Name: nodeName, - } + nodeName := req.URL.Path[len("/remove/"):] + command := &RemoveCommand{ + Name: nodeName, + } - log.Debugf("[recv] Remove Request [%s]", command.Name) + log.Debugf("[recv] Remove Request [%s]", command.Name) - s.dispatchRaftCommand(command, w, req) + s.dispatchRaftCommand(command, w, req) } // Response to the name request func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/name/ ", s.url) - w.WriteHeader(http.StatusOK) - w.Write([]byte(s.name)) + log.Debugf("[recv] Get %s/name/ ", s.url) + w.WriteHeader(http.StatusOK) + w.Write([]byte(s.name)) } // Response to the name request func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version/ ", s.url) - w.WriteHeader(http.StatusOK) - w.Write([]byte(PeerVersion)) + log.Debugf("[recv] Get %s/version/ ", s.url) + w.WriteHeader(http.StatusOK) + w.Write([]byte(PeerVersion)) } func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error { - return s.dispatch(c, w, req) + return s.dispatch(c, w, req) } func (s *PeerServer) startAsLeader() { - // leader need to join self as a peer - for { - _, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL())) - if err == nil { - break - } - } - log.Debugf("%s start as a leader", s.name) + // leader need to join self as a peer + for { + _, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL())) + if err == nil { + break + } + } + log.Debugf("%s start as a leader", s.name) } func (s *PeerServer) startAsFollower(cluster []string) { - // start as a follower in a existing cluster - for i := 0; i < s.RetryTimes; i++ { - ok := s.joinCluster(cluster) - if ok { - return - } - log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) - time.Sleep(time.Second * RetryInterval) - } - - log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes) + // start as a follower in a existing cluster + for i := 0; i < s.RetryTimes; i++ { + ok := s.joinCluster(cluster) + if ok { + return + } + log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) + time.Sleep(time.Second * RetryInterval) + } + + log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes) } // Start to listen and response raft command func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { - log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) - - raftMux := http.NewServeMux() - - server := &http.Server{ - Handler: raftMux, - TLSConfig: &tlsConf, - Addr: s.listenHost, - } - - // internal commands - raftMux.HandleFunc("/name", s.NameHttpHandler) - raftMux.HandleFunc("/version", s.RaftVersionHttpHandler) - raftMux.HandleFunc("/join", s.JoinHttpHandler) - raftMux.HandleFunc("/remove/", s.RemoveHttpHandler) - raftMux.HandleFunc("/vote", s.VoteHttpHandler) - raftMux.HandleFunc("/log", s.GetLogHttpHandler) - raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler) - raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler) - raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) - raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) - - if scheme == "http" { - log.Fatal(server.ListenAndServe()) - } else { - log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) - } + log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) + + raftMux := http.NewServeMux() + + server := &http.Server{ + Handler: raftMux, + TLSConfig: &tlsConf, + Addr: s.listenHost, + } + + // internal commands + raftMux.HandleFunc("/name", s.NameHttpHandler) + raftMux.HandleFunc("/version", s.RaftVersionHttpHandler) + raftMux.HandleFunc("/join", s.JoinHttpHandler) + raftMux.HandleFunc("/remove/", s.RemoveHttpHandler) + raftMux.HandleFunc("/vote", s.VoteHttpHandler) + raftMux.HandleFunc("/log", s.GetLogHttpHandler) + raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler) + raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler) + raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) + raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) + + if scheme == "http" { + log.Fatal(server.ListenAndServe()) + } else { + log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) + } } @@ -365,184 +364,182 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { // will need to do something more sophisticated later when we allow mixed // version clusters. func getVersion(t *transporter, versionURL url.URL) (string, error) { - resp, req, err := t.Get(versionURL.String()) - if err != nil { - return "", err - } - defer resp.Body.Close() + resp, req, err := t.Get(versionURL.String()) + if err != nil { + return "", err + } + defer resp.Body.Close() - t.CancelWhenTimeout(req) + t.CancelWhenTimeout(req) - body, err := ioutil.ReadAll(resp.Body) + body, err := ioutil.ReadAll(resp.Body) - return string(body), nil + return string(body), nil } func (s *PeerServer) joinCluster(cluster []string) bool { - for _, machine := range cluster { - if len(machine) == 0 { - continue - } - - err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme) - if err == nil { - log.Debugf("%s success join to the cluster via machine %s", s.name, machine) - return true - - } else { - if _, ok := err.(etcdErr.Error); ok { - log.Fatal(err) - } - - log.Debugf("cannot join to cluster via machine %s %s", machine, err) - } - } - return false + for _, machine := range cluster { + if len(machine) == 0 { + continue + } + + err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme) + if err == nil { + log.Debugf("%s success join to the cluster via machine %s", s.name, machine) + return true + + } else { + if _, ok := err.(etcdErr.Error); ok { + log.Fatal(err) + } + + log.Debugf("cannot join to cluster via machine %s %s", machine, err) + } + } + return false } // Send join requests to machine. func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error { - var b bytes.Buffer + var b bytes.Buffer - // t must be ok - t, _ := server.Transporter().(*transporter) + // t must be ok + t, _ := server.Transporter().(*transporter) - // Our version must match the leaders version - versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} - version, err := getVersion(t, versionURL) - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } + // Our version must match the leaders version + versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} + version, err := getVersion(t, versionURL) + if err != nil { + return fmt.Errorf("Unable to join: %v", err) + } - // TODO: versioning of the internal protocol. See: - // Documentation/internatl-protocol-versioning.md - if version != PeerVersion { - return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") - } + // TODO: versioning of the internal protocol. See: + // Documentation/internatl-protocol-versioning.md + if version != PeerVersion { + return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") + } - json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) + json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) - joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} + joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} - log.Debugf("Send Join Request to %s", joinURL.String()) + log.Debugf("Send Join Request to %s", joinURL.String()) - resp, req, err := t.Post(joinURL.String(), &b) + resp, req, err := t.Post(joinURL.String(), &b) - for { - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } - if resp != nil { - defer resp.Body.Close() + for { + if err != nil { + return fmt.Errorf("Unable to join: %v", err) + } + if resp != nil { + defer resp.Body.Close() - t.CancelWhenTimeout(req) + t.CancelWhenTimeout(req) - if resp.StatusCode == http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - s.joinIndex, _ = binary.Uvarint(b) - return nil - } - if resp.StatusCode == http.StatusTemporaryRedirect { + if resp.StatusCode == http.StatusOK { + b, _ := ioutil.ReadAll(resp.Body) + s.joinIndex, _ = binary.Uvarint(b) + return nil + } + if resp.StatusCode == http.StatusTemporaryRedirect { - address := resp.Header.Get("Location") - log.Debugf("Send Join Request to %s", address) + address := resp.Header.Get("Location") + log.Debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) + json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) - resp, req, err = t.Post(address, &b) + resp, req, err = t.Post(address, &b) - } else if resp.StatusCode == http.StatusBadRequest { - log.Debug("Reach max number machines in the cluster") - decoder := json.NewDecoder(resp.Body) - err := &etcdErr.Error{} - decoder.Decode(err) - return *err - } else { - return fmt.Errorf("Unable to join") - } - } + } else if resp.StatusCode == http.StatusBadRequest { + log.Debug("Reach max number machines in the cluster") + decoder := json.NewDecoder(resp.Body) + err := &etcdErr.Error{} + decoder.Decode(err) + return *err + } else { + return fmt.Errorf("Unable to join") + } + } - } - return fmt.Errorf("Unable to join: %v", err) + } } func (s *PeerServer) Stats() []byte { - s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String() + s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String() - queue := s.serverStats.sendRateQueue + queue := s.serverStats.sendRateQueue - s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate() + s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate() - queue = s.serverStats.recvRateQueue + queue = s.serverStats.recvRateQueue - s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate() + s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate() - b, _ := json.Marshal(s.serverStats) + b, _ := json.Marshal(s.serverStats) - return b + return b } func (s *PeerServer) PeerStats() []byte { - if s.State() == raft.Leader { - b, _ := json.Marshal(s.followersStats) - return b - } - return nil + if s.State() == raft.Leader { + b, _ := json.Marshal(s.followersStats) + return b + } + return nil } func (s *PeerServer) monitorSnapshot() { - for { - time.Sleep(s.snapConf.checkingInterval) - currentWrites := 0 - if uint64(currentWrites) > s.snapConf.writesThr { - s.TakeSnapshot() - s.snapConf.lastWrites = 0 - } - } + for { + time.Sleep(s.snapConf.checkingInterval) + currentWrites := 0 + if uint64(currentWrites) > s.snapConf.writesThr { + s.TakeSnapshot() + s.snapConf.lastWrites = 0 + } + } } func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { - if s.State() == raft.Leader { - if response, err := s.Do(c); err != nil { - return err - } else { - if response == nil { - return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm) - } - - event, ok := response.(*store.Event) - if ok { - bytes, err := json.Marshal(event) - if err != nil { - fmt.Println(err) - } - - w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) - w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) - w.WriteHeader(http.StatusOK) - w.Write(bytes) - - return nil - } - - bytes, _ := response.([]byte) - w.WriteHeader(http.StatusOK) - w.Write(bytes) - - return nil - } - - } else { - leader := s.Leader() - // current no leader - if leader == "" { - return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) - } - url, _ := s.registry.PeerURL(leader) - - redirect(url, w, req) - - return nil - } + if s.State() == raft.Leader { + if response, err := s.Do(c); err != nil { + return err + } else { + if response == nil { + return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm) + } + + event, ok := response.(*store.Event) + if ok { + bytes, err := json.Marshal(event) + if err != nil { + fmt.Println(err) + } + + w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) + w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) + w.WriteHeader(http.StatusOK) + w.Write(bytes) + + return nil + } + + bytes, _ := response.([]byte) + w.WriteHeader(http.StatusOK) + w.Write(bytes) + + return nil + } + + } else { + leader := s.Leader() + // current no leader + if leader == "" { + return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) + } + url, _ := s.registry.PeerURL(leader) + + redirect(url, w, req) + + return nil + } } - diff --git a/server/registry.go b/server/registry.go index 8ee2406ed70..7335b88ea10 100644 --- a/server/registry.go +++ b/server/registry.go @@ -1,14 +1,14 @@ package server import ( - "fmt" - "net/url" - "path" - "strings" - "sync" - - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" + "fmt" + "net/url" + "path" + "strings" + "sync" + + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" ) // The location of the machine URL data. @@ -16,174 +16,174 @@ const RegistryKey = "/_etcd/machines" // The Registry stores URL information for nodes. type Registry struct { - sync.Mutex - store *store.Store - nodes map[string]*node + sync.Mutex + store *store.Store + nodes map[string]*node } // The internal storage format of the registry. type node struct { - peerVersion string - peerURL string - url string + peerVersion string + peerURL string + url string } // Creates a new Registry. func NewRegistry(s *store.Store) *Registry { - return &Registry{ - store: s, - nodes: make(map[string]*node), - } + return &Registry{ + store: s, + nodes: make(map[string]*node), + } } // Adds a node to the registry. func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) { - r.Lock() - defer r.Unlock() + r.Lock() + defer r.Unlock() - // Write data to store. - key := path.Join(RegistryKey, name) - value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion) - r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) + // Write data to store. + key := path.Join(RegistryKey, name) + value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion) + r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) } // Removes a node from the registry. func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error { - r.Lock() - defer r.Unlock() + r.Lock() + defer r.Unlock() - // Remove the key from the store. - _, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term) - return err + // Remove the key from the store. + _, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term) + return err } // Returns the number of nodes in the cluster. func (r *Registry) Count() int { - e, err := r.store.Get(RegistryKey, false, false, 0, 0) - if err != nil { - return 0 - } - return len(e.KVPairs) + e, err := r.store.Get(RegistryKey, false, false, 0, 0) + if err != nil { + return 0 + } + return len(e.KVPairs) } // Retrieves the URL for a given node by name. func (r *Registry) URL(name string) (string, bool) { - r.Lock() - defer r.Unlock() - return r.url(name) + r.Lock() + defer r.Unlock() + return r.url(name) } func (r *Registry) url(name string) (string, bool) { - if r.nodes[name] == nil { - r.load(name) - } + if r.nodes[name] == nil { + r.load(name) + } - if node := r.nodes[name]; node != nil { - return node.url, true - } + if node := r.nodes[name]; node != nil { + return node.url, true + } - return "", false + return "", false } // Retrieves the URLs for all nodes. func (r *Registry) URLs(leaderName, selfName string) []string { - r.Lock() - defer r.Unlock() - - // Build list including the leader and self. - urls := make([]string, 0) - if url, _ := r.url(leaderName); len(url) > 0 { - urls = append(urls, url) - } - if url, _ := r.url(selfName); len(url) > 0 { - urls = append(urls, url) - } - - // Retrieve a list of all nodes. - if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { - // Lookup the URL for each one. - for _, pair := range e.KVPairs { - if url, _ := r.url(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName { - urls = append(urls, url) - } - } - } - - log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) - - return urls + r.Lock() + defer r.Unlock() + + // Build list including the leader and self. + urls := make([]string, 0) + if url, _ := r.url(leaderName); len(url) > 0 { + urls = append(urls, url) + } + if url, _ := r.url(selfName); len(url) > 0 { + urls = append(urls, url) + } + + // Retrieve a list of all nodes. + if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { + // Lookup the URL for each one. + for _, pair := range e.KVPairs { + if url, _ := r.url(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName { + urls = append(urls, url) + } + } + } + + log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) + + return urls } // Retrieves the peer URL for a given node by name. func (r *Registry) PeerURL(name string) (string, bool) { - r.Lock() - defer r.Unlock() - return r.peerURL(name) + r.Lock() + defer r.Unlock() + return r.peerURL(name) } func (r *Registry) peerURL(name string) (string, bool) { - if r.nodes[name] == nil { - r.load(name) - } + if r.nodes[name] == nil { + r.load(name) + } - if node := r.nodes[name]; node != nil { - return node.peerURL, true - } + if node := r.nodes[name]; node != nil { + return node.peerURL, true + } - return "", false + return "", false } // Retrieves the peer URLs for all nodes. func (r *Registry) PeerURLs(leaderName, selfName string) []string { - r.Lock() - defer r.Unlock() - - // Build list including the leader and self. - urls := make([]string, 0) - if url, _ := r.peerURL(leaderName); len(url) > 0 { - urls = append(urls, url) - } - if url, _ := r.peerURL(selfName); len(url) > 0 { - urls = append(urls, url) - } - - // Retrieve a list of all nodes. - if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { - // Lookup the URL for each one. - for _, pair := range e.KVPairs { - if url, _ := r.peerURL(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName { - urls = append(urls, url) - } - } - } - - log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) - - return urls + r.Lock() + defer r.Unlock() + + // Build list including the leader and self. + urls := make([]string, 0) + if url, _ := r.peerURL(leaderName); len(url) > 0 { + urls = append(urls, url) + } + if url, _ := r.peerURL(selfName); len(url) > 0 { + urls = append(urls, url) + } + + // Retrieve a list of all nodes. + if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { + // Lookup the URL for each one. + for _, pair := range e.KVPairs { + if url, _ := r.peerURL(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName { + urls = append(urls, url) + } + } + } + + log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) + + return urls } // Loads the given node by name from the store into the cache. func (r *Registry) load(name string) { - if name == "" { - return - } - - // Retrieve from store. - e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0) - if err != nil { - return - } - - // Parse as a query string. - m, err := url.ParseQuery(e.Value) - if err != nil { - panic(fmt.Sprintf("Failed to parse machines entry: %s", name)) - } - - // Create node. - r.nodes[name] = &node{ - url: m["etcd"][0], - peerURL: m["raft"][0], - peerVersion: m["raftVersion"][0], - } + if name == "" { + return + } + + // Retrieve from store. + e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0) + if err != nil { + return + } + + // Parse as a query string. + m, err := url.ParseQuery(e.Value) + if err != nil { + panic(fmt.Sprintf("Failed to parse machines entry: %s", name)) + } + + // Create node. + r.nodes[name] = &node{ + url: m["etcd"][0], + peerURL: m["raft"][0], + peerVersion: m["raftVersion"][0], + } } diff --git a/server/server.go b/server/server.go index 98492711020..76a62fdce70 100644 --- a/server/server.go +++ b/server/server.go @@ -1,8 +1,8 @@ package server import ( - "fmt" "encoding/json" + "fmt" "net/http" "net/url" "strings" @@ -38,6 +38,7 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI TLSConfig: &tlsConf.Server, Addr: listenHost, }, + name: name, store: store, registry: registry, url: urlStr, @@ -134,7 +135,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque // Wrap the standard HandleFunc interface to pass in the server reference. return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { // Log request. - log.Debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr) + log.Debugf("[recv] %s %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr) // Write CORS header. if s.OriginAllowed("*") { @@ -242,28 +243,28 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro // Handler to return all the known machines in the current cluster. func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error { - machines := s.registry.URLs(s.peerServer.Leader(), s.name) - w.WriteHeader(http.StatusOK) - w.Write([]byte(strings.Join(machines, ", "))) - return nil + machines := s.registry.URLs(s.peerServer.Leader(), s.name) + w.WriteHeader(http.StatusOK) + w.Write([]byte(strings.Join(machines, ", "))) + return nil } // Retrieves stats on the Raft server. func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error { - w.Write(s.peerServer.Stats()) - return nil + w.Write(s.peerServer.Stats()) + return nil } // Retrieves stats on the leader. func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error { if s.peerServer.State() == raft.Leader { - w.Write(s.peerServer.PeerStats()) - return nil + w.Write(s.peerServer.PeerStats()) + return nil } leader := s.peerServer.Leader() if leader == "" { - return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) + return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) } hostname, _ := s.registry.URL(leader) redirect(hostname, w, req) @@ -272,8 +273,8 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) // Retrieves stats on the leader. func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error { - w.Write(s.store.JsonStats()) - return nil + w.Write(s.store.JsonStats()) + return nil } // Executes a speed test to evaluate the performance of update replication. @@ -284,8 +285,8 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro go func() { for j := 0; j < 10; j++ { c := &store.UpdateCommand{ - Key: "foo", - Value: "bar", + Key: "foo", + Value: "bar", ExpireTime: time.Unix(0, 0), } s.peerServer.Do(c) diff --git a/server/timeout.go b/server/timeout.go index 321e77b38f7..fa48c316215 100644 --- a/server/timeout.go +++ b/server/timeout.go @@ -1,15 +1,15 @@ package server import ( - "time" + "time" ) const ( - // The amount of time to elapse without a heartbeat before becoming a candidate. - ElectionTimeout = 200 * time.Millisecond + // The amount of time to elapse without a heartbeat before becoming a candidate. + ElectionTimeout = 200 * time.Millisecond - // The frequency by which heartbeats are sent to followers. - HeartbeatTimeout = 50 * time.Millisecond + // The frequency by which heartbeats are sent to followers. + HeartbeatTimeout = 50 * time.Millisecond - RetryInterval = 10 + RetryInterval = 10 ) diff --git a/server/transporter.go b/server/transporter.go index 7397d45342b..03928e2b3ba 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -1,17 +1,17 @@ package server import ( - "bytes" - "crypto/tls" - "encoding/json" - "fmt" - "io" - "net" - "net/http" - "time" - - "github.com/coreos/etcd/log" - "github.com/coreos/go-raft" + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "time" + + "github.com/coreos/etcd/log" + "github.com/coreos/go-raft" ) // Timeout for setup internal raft http connection @@ -28,200 +28,200 @@ var tranTimeout = ElectionTimeout // Transporter layer for communication between raft nodes type transporter struct { - client *http.Client - transport *http.Transport - peerServer *PeerServer + client *http.Client + transport *http.Transport + peerServer *PeerServer } // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { - t := transporter{} + t := transporter{} - tr := &http.Transport{ - Dial: dialWithTimeout, - ResponseHeaderTimeout: responseHeaderTimeout, - } + tr := &http.Transport{ + Dial: dialWithTimeout, + ResponseHeaderTimeout: responseHeaderTimeout, + } - if scheme == "https" { - tr.TLSClientConfig = &tlsConf - tr.DisableCompression = true - } + if scheme == "https" { + tr.TLSClientConfig = &tlsConf + tr.DisableCompression = true + } - t.client = &http.Client{Transport: tr} - t.transport = tr - t.peerServer = peerServer + t.client = &http.Client{Transport: tr} + t.transport = tr + t.peerServer = peerServer - return &t + return &t } // Dial with timeout func dialWithTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, dailTimeout) + return net.DialTimeout(network, addr, dailTimeout) } // Sends AppendEntries RPCs to a peer when the server is the leader. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { - var aersp *raft.AppendEntriesResponse - var b bytes.Buffer + var aersp *raft.AppendEntriesResponse + var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + json.NewEncoder(&b).Encode(req) - size := b.Len() + size := b.Len() - t.peerServer.serverStats.SendAppendReq(size) + t.peerServer.serverStats.SendAppendReq(size) - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.peerServer.registry.PeerURL(peer.Name) - log.Debugf("Send LogEntries to %s ", u) + log.Debugf("Send LogEntries to %s ", u) - thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] + thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] - if !ok { //this is the first time this follower has been seen - thisFollowerStats = &raftFollowerStats{} - thisFollowerStats.Latency.Minimum = 1 << 63 - t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats - } + if !ok { //this is the first time this follower has been seen + thisFollowerStats = &raftFollowerStats{} + thisFollowerStats.Latency.Minimum = 1 << 63 + t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats + } - start := time.Now() + start := time.Now() - resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) - end := time.Now() + end := time.Now() - if err != nil { - log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) - if ok { - thisFollowerStats.Fail() - } - } else { - if ok { - thisFollowerStats.Succ(end.Sub(start)) - } - } + if err != nil { + log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) + if ok { + thisFollowerStats.Fail() + } + } else { + if ok { + thisFollowerStats.Succ(end.Sub(start)) + } + } - if resp != nil { - defer resp.Body.Close() + if resp != nil { + defer resp.Body.Close() - t.CancelWhenTimeout(httpRequest) + t.CancelWhenTimeout(httpRequest) - aersp = &raft.AppendEntriesResponse{} - if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp - } + aersp = &raft.AppendEntriesResponse{} + if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + return aersp + } - } + } - return aersp + return aersp } // Sends RequestVote RPCs to a peer when the server is the candidate. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { - var rvrsp *raft.RequestVoteResponse - var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + var rvrsp *raft.RequestVoteResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) - u, _ := t.peerServer.registry.PeerURL(peer.Name) - log.Debugf("Send Vote to %s", u) + u, _ := t.peerServer.registry.PeerURL(peer.Name) + log.Debugf("Send Vote to %s", u) - resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) - if err != nil { - log.Debugf("Cannot send VoteRequest to %s : %s", u, err) - } + if err != nil { + log.Debugf("Cannot send VoteRequest to %s : %s", u, err) + } - if resp != nil { - defer resp.Body.Close() + if resp != nil { + defer resp.Body.Close() - t.CancelWhenTimeout(httpRequest) + t.CancelWhenTimeout(httpRequest) - rvrsp := &raft.RequestVoteResponse{} - if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { - return rvrsp - } + rvrsp := &raft.RequestVoteResponse{} + if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { + return rvrsp + } - } - return rvrsp + } + return rvrsp } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { - var aersp *raft.SnapshotResponse - var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + var aersp *raft.SnapshotResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) - u, _ := t.peerServer.registry.PeerURL(peer.Name) - log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) + u, _ := t.peerServer.registry.PeerURL(peer.Name) + log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, + req.LastTerm, req.LastIndex) - resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) - if err != nil { - log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) - } + if err != nil { + log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) + } - if resp != nil { - defer resp.Body.Close() + if resp != nil { + defer resp.Body.Close() - t.CancelWhenTimeout(httpRequest) + t.CancelWhenTimeout(httpRequest) - aersp = &raft.SnapshotResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + aersp = &raft.SnapshotResponse{} + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp - } - } + return aersp + } + } - return aersp + return aersp } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { - var aersp *raft.SnapshotRecoveryResponse - var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + var aersp *raft.SnapshotRecoveryResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) - u, _ := t.peerServer.registry.PeerURL(peer.Name) - log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) + u, _ := t.peerServer.registry.PeerURL(peer.Name) + log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, + req.LastTerm, req.LastIndex) - resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) + resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) - if err != nil { - log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) - } + if err != nil { + log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) + } - if resp != nil { - defer resp.Body.Close() - aersp = &raft.SnapshotRecoveryResponse{} + if resp != nil { + defer resp.Body.Close() + aersp = &raft.SnapshotRecoveryResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp - } - } + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + return aersp + } + } - return aersp + return aersp } // Send server side POST request func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("POST", urlStr, body) - resp, err := t.client.Do(req) - return resp, req, err + req, _ := http.NewRequest("POST", urlStr, body) + resp, err := t.client.Do(req) + return resp, req, err } // Send server side GET request func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("GET", urlStr, nil) - resp, err := t.client.Do(req) - return resp, req, err + req, _ := http.NewRequest("GET", urlStr, nil) + resp, err := t.client.Do(req) + return resp, req, err } // Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { - go func() { - time.Sleep(ElectionTimeout) - t.transport.CancelRequest(req) - }() + go func() { + time.Sleep(ElectionTimeout) + t.transport.CancelRequest(req) + }() } diff --git a/server/util.go b/server/util.go index 95d93c17950..6a53884baa2 100644 --- a/server/util.go +++ b/server/util.go @@ -1,27 +1,26 @@ package server import ( - "encoding/json" - "fmt" - "io" - "net/http" + "encoding/json" + "fmt" + "io" + "net/http" - "github.com/coreos/etcd/log" + "github.com/coreos/etcd/log" ) func decodeJsonRequest(req *http.Request, data interface{}) error { - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&data); err != nil && err != io.EOF { - log.Warnf("Malformed json request: %v", err) - return fmt.Errorf("Malformed json request: %v", err) - } - return nil + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&data); err != nil && err != io.EOF { + log.Warnf("Malformed json request: %v", err) + return fmt.Errorf("Malformed json request: %v", err) + } + return nil } func redirect(hostname string, w http.ResponseWriter, req *http.Request) { - path := req.URL.Path - url := hostname + path - log.Debugf("Redirect to %s", url) - http.Redirect(w, req, url, http.StatusTemporaryRedirect) + path := req.URL.Path + url := hostname + path + log.Debugf("Redirect to %s", url) + http.Redirect(w, req, url, http.StatusTemporaryRedirect) } - diff --git a/server/v1/delete_key_handler.go b/server/v1/delete_key_handler.go index a657e9f3d77..1288d6597e1 100644 --- a/server/v1/delete_key_handler.go +++ b/server/v1/delete_key_handler.go @@ -1,9 +1,9 @@ package v1 import ( + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" "net/http" - "github.com/coreos/etcd/store" - "github.com/gorilla/mux" ) // Removes a key from the store. diff --git a/server/v1/v1.go b/server/v1/v1.go index 586a08e67e9..f71ed06220e 100644 --- a/server/v1/v1.go +++ b/server/v1/v1.go @@ -1,9 +1,9 @@ package v1 import ( - "net/http" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" + "net/http" ) // The Server interface provides all the methods required for the v1 API. diff --git a/server/v2/create_key_handler.go b/server/v2/create_key_handler.go index 8a9fbf50e85..fab9bde1f9c 100644 --- a/server/v2/create_key_handler.go +++ b/server/v2/create_key_handler.go @@ -1,29 +1,29 @@ package v2 import ( - "net/http" + "net/http" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/gorilla/mux" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" ) func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] + vars := mux.Vars(req) + key := "/" + vars["key"] - value := req.FormValue("value") - expireTime, err := store.TTL(req.FormValue("ttl")) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) - } + value := req.FormValue("value") + expireTime, err := store.TTL(req.FormValue("ttl")) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) + } - c := &store.CreateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - IncrementalSuffix: (req.FormValue("incremental") == "true"), - } + c := &store.CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + IncrementalSuffix: (req.FormValue("incremental") == "true"), + } - return s.Dispatch(c, w, req) + return s.Dispatch(c, w, req) } diff --git a/server/v2/delete_key_handler.go b/server/v2/delete_key_handler.go index e3bdf2b2d27..c53e7245983 100644 --- a/server/v2/delete_key_handler.go +++ b/server/v2/delete_key_handler.go @@ -1,20 +1,20 @@ package v2 import ( - "net/http" + "net/http" - "github.com/coreos/etcd/store" - "github.com/gorilla/mux" + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" ) func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] + vars := mux.Vars(req) + key := "/" + vars["key"] - c := &store.DeleteCommand{ - Key: key, - Recursive: (req.FormValue("recursive") == "true"), - } + c := &store.DeleteCommand{ + Key: key, + Recursive: (req.FormValue("recursive") == "true"), + } - return s.Dispatch(c, w, req) + return s.Dispatch(c, w, req) } diff --git a/server/v2/get_key_handler.go b/server/v2/get_key_handler.go index e4d9b7207e3..e6cce6e8bd2 100644 --- a/server/v2/get_key_handler.go +++ b/server/v2/get_key_handler.go @@ -1,69 +1,69 @@ package v2 import ( - "encoding/json" - "fmt" - "net/http" - "strconv" + "encoding/json" + "fmt" + "net/http" + "strconv" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" - "github.com/gorilla/mux" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "github.com/gorilla/mux" ) func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { - var err error - var event *store.Event + var err error + var event *store.Event - vars := mux.Vars(req) - key := "/" + vars["key"] + vars := mux.Vars(req) + key := "/" + vars["key"] - // Help client to redirect the request to the current leader - if req.FormValue("consistent") == "true" && s.State() != raft.Leader { - leader := s.Leader() - hostname, _ := s.PeerURL(leader) - url := hostname + req.URL.Path - log.Debugf("Redirect to %s", url) - http.Redirect(w, req, url, http.StatusTemporaryRedirect) - return nil - } + // Help client to redirect the request to the current leader + if req.FormValue("consistent") == "true" && s.State() != raft.Leader { + leader := s.Leader() + hostname, _ := s.PeerURL(leader) + url := hostname + req.URL.Path + log.Debugf("Redirect to %s", url) + http.Redirect(w, req, url, http.StatusTemporaryRedirect) + return nil + } - recursive := (req.FormValue("recursive") == "true") - sorted := (req.FormValue("sorted") == "true") + recursive := (req.FormValue("recursive") == "true") + sorted := (req.FormValue("sorted") == "true") - if req.FormValue("wait") == "true" { // watch - // Create a command to watch from a given index (default 0). - var sinceIndex uint64 = 0 - if req.Method == "POST" { - sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) - } - } + if req.FormValue("wait") == "true" { // watch + // Create a command to watch from a given index (default 0). + var sinceIndex uint64 = 0 + if req.Method == "POST" { + sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) + } + } - // Start the watcher on the store. - c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term()) - if err != nil { - return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm) - } - event = <-c + // Start the watcher on the store. + c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term()) + if err != nil { + return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm) + } + event = <-c - } else { //get - // Retrieve the key from the store. - event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term()) - if err != nil { - return err - } - } + } else { //get + // Retrieve the key from the store. + event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term()) + if err != nil { + return err + } + } - w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) - w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) - w.WriteHeader(http.StatusOK) + w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) + w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) + w.WriteHeader(http.StatusOK) - b, _ := json.Marshal(event) - w.Write(b) + b, _ := json.Marshal(event) + w.Write(b) - return nil + return nil } diff --git a/server/v2/update_key_handler.go b/server/v2/update_key_handler.go index 64e60cca5f1..841c8828de4 100644 --- a/server/v2/update_key_handler.go +++ b/server/v2/update_key_handler.go @@ -1,64 +1,64 @@ package v2 import ( - "net/http" - "strconv" + "net/http" + "strconv" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" - "github.com/gorilla/mux" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "github.com/gorilla/mux" ) func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] + vars := mux.Vars(req) + key := "/" + vars["key"] - req.ParseForm() + req.ParseForm() - value := req.Form.Get("value") - expireTime, err := store.TTL(req.Form.Get("ttl")) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) - } + value := req.Form.Get("value") + expireTime, err := store.TTL(req.Form.Get("ttl")) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) + } - // Update should give at least one option - if value == "" && expireTime.Sub(store.Permanent) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) - } + // Update should give at least one option + if value == "" && expireTime.Sub(store.Permanent) == 0 { + return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) + } - prevValue, valueOk := req.Form["prevValue"] - prevIndexStr, indexOk := req.Form["prevIndex"] + prevValue, valueOk := req.Form["prevValue"] + prevIndexStr, indexOk := req.Form["prevIndex"] - var c raft.Command - if !valueOk && !indexOk { // update without test - c = &store.UpdateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } + var c raft.Command + if !valueOk && !indexOk { // update without test + c = &store.UpdateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } - } else { // update with test - var prevIndex uint64 + } else { // update with test + var prevIndex uint64 - if indexOk { - prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) + if indexOk { + prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) - } - } else { - prevIndex = 0 - } + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) + } + } else { + prevIndex = 0 + } - c = &store.TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue[0], - PrevIndex: prevIndex, - } - } + c = &store.TestAndSetCommand{ + Key: key, + Value: value, + PrevValue: prevValue[0], + PrevIndex: prevIndex, + } + } - return s.Dispatch(c, w, req) + return s.Dispatch(c, w, req) } diff --git a/server/v2/v2.go b/server/v2/v2.go index 439f6078e42..e412e859b23 100644 --- a/server/v2/v2.go +++ b/server/v2/v2.go @@ -1,18 +1,18 @@ package v2 import ( - "net/http" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "net/http" ) // The Server interface provides all the methods required for the v2 API. type Server interface { - State() string - Leader() string - CommitIndex() uint64 - Term() uint64 - PeerURL(string) (string, bool) - Store() *store.Store - Dispatch(raft.Command, http.ResponseWriter, *http.Request) error + State() string + Leader() string + CommitIndex() uint64 + Term() uint64 + PeerURL(string) (string, bool) + Store() *store.Store + Dispatch(raft.Command, http.ResponseWriter, *http.Request) error } diff --git a/store/create_command.go b/store/create_command.go index 0263347a655..2ccddd1035c 100644 --- a/store/create_command.go +++ b/store/create_command.go @@ -7,7 +7,7 @@ import ( ) func init() { - raft.RegisterCommand(&CreateCommand{}) + raft.RegisterCommand(&CreateCommand{}) } // Create command diff --git a/store/delete_command.go b/store/delete_command.go index 3ac48bc3570..324410192eb 100644 --- a/store/delete_command.go +++ b/store/delete_command.go @@ -6,7 +6,7 @@ import ( ) func init() { - raft.RegisterCommand(&DeleteCommand{}) + raft.RegisterCommand(&DeleteCommand{}) } // The DeleteCommand removes a key from the Store. diff --git a/store/event.go b/store/event.go index 48420186560..0d9ec0a37de 100644 --- a/store/event.go +++ b/store/event.go @@ -77,5 +77,3 @@ func (event *Event) Response() interface{} { return responses } } - - diff --git a/store/event_history.go b/store/event_history.go index 73db5d87671..3ddd3820690 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -1,112 +1,112 @@ package store import ( - "fmt" - "strings" - "sync" + "fmt" + "strings" + "sync" - etcdErr "github.com/coreos/etcd/error" + etcdErr "github.com/coreos/etcd/error" ) type EventHistory struct { - Queue eventQueue - StartIndex uint64 - LastIndex uint64 - LastTerm uint64 - DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue - rwl sync.RWMutex + Queue eventQueue + StartIndex uint64 + LastIndex uint64 + LastTerm uint64 + DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue + rwl sync.RWMutex } func newEventHistory(capacity int) *EventHistory { - return &EventHistory{ - Queue: eventQueue{ - Capacity: capacity, - Events: make([]*Event, capacity), - }, - } + return &EventHistory{ + Queue: eventQueue{ + Capacity: capacity, + Events: make([]*Event, capacity), + }, + } } // addEvent function adds event into the eventHistory func (eh *EventHistory) addEvent(e *Event) *Event { - eh.rwl.Lock() - defer eh.rwl.Unlock() + eh.rwl.Lock() + defer eh.rwl.Unlock() - var duped uint64 + var duped uint64 - if e.Index == UndefIndex { - e.Index = eh.LastIndex - e.Term = eh.LastTerm - duped = 1 - } + if e.Index == UndefIndex { + e.Index = eh.LastIndex + e.Term = eh.LastTerm + duped = 1 + } - eh.Queue.insert(e) + eh.Queue.insert(e) - eh.LastIndex = e.Index - eh.LastTerm = e.Term - eh.DupCnt += duped + eh.LastIndex = e.Index + eh.LastTerm = e.Term + eh.DupCnt += duped - eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index + eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index - return e + return e } // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) { - eh.rwl.RLock() - defer eh.rwl.RUnlock() - - start := index - eh.StartIndex - - // the index should locate after the event history's StartIndex - if start < 0 { - return nil, - etcdErr.NewError(etcdErr.EcodeEventIndexCleared, - fmt.Sprintf("the requested history has been cleared [%v/%v]", - eh.StartIndex, index), UndefIndex, UndefTerm) - } - - // the index should locate before the size of the queue minus the duplicate count - if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index - return nil, nil - } - - i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) - - for { - e := eh.Queue.Events[i] - if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one - return e, nil - } - - i = (i + 1) % eh.Queue.Capacity - - if i == eh.Queue.back() { // find nothing, return and watch from current index - return nil, nil - } - } + eh.rwl.RLock() + defer eh.rwl.RUnlock() + + start := index - eh.StartIndex + + // the index should locate after the event history's StartIndex + if start < 0 { + return nil, + etcdErr.NewError(etcdErr.EcodeEventIndexCleared, + fmt.Sprintf("the requested history has been cleared [%v/%v]", + eh.StartIndex, index), UndefIndex, UndefTerm) + } + + // the index should locate before the size of the queue minus the duplicate count + if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index + return nil, nil + } + + i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) + + for { + e := eh.Queue.Events[i] + if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one + return e, nil + } + + i = (i + 1) % eh.Queue.Capacity + + if i == eh.Queue.back() { // find nothing, return and watch from current index + return nil, nil + } + } } // clone will be protected by a stop-world lock // do not need to obtain internal lock func (eh *EventHistory) clone() *EventHistory { - clonedQueue := eventQueue{ - Capacity: eh.Queue.Capacity, - Events: make([]*Event, eh.Queue.Capacity), - Size: eh.Queue.Size, - Front: eh.Queue.Front, - } - - for i, e := range eh.Queue.Events { - clonedQueue.Events[i] = e - } - - return &EventHistory{ - StartIndex: eh.StartIndex, - Queue: clonedQueue, - LastIndex: eh.LastIndex, - LastTerm: eh.LastTerm, - DupCnt: eh.DupCnt, - } + clonedQueue := eventQueue{ + Capacity: eh.Queue.Capacity, + Events: make([]*Event, eh.Queue.Capacity), + Size: eh.Queue.Size, + Front: eh.Queue.Front, + } + + for i, e := range eh.Queue.Events { + clonedQueue.Events[i] = e + } + + return &EventHistory{ + StartIndex: eh.StartIndex, + Queue: clonedQueue, + LastIndex: eh.LastIndex, + LastTerm: eh.LastTerm, + DupCnt: eh.DupCnt, + } } diff --git a/store/event_queue.go b/store/event_queue.go index 7c520ffe4dc..0852956b1b8 100644 --- a/store/event_queue.go +++ b/store/event_queue.go @@ -1,26 +1,25 @@ package store - type eventQueue struct { - Events []*Event - Size int - Front int - Capacity int + Events []*Event + Size int + Front int + Capacity int } func (eq *eventQueue) back() int { - return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity + return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity } func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity + index := (eq.back() + 1) % eq.Capacity - eq.Events[index] = e + eq.Events[index] = e - if eq.Size == eq.Capacity { //dequeue - eq.Front = (index + 1) % eq.Capacity - } else { - eq.Size++ - } + if eq.Size == eq.Capacity { //dequeue + eq.Front = (index + 1) % eq.Capacity + } else { + eq.Size++ + } } diff --git a/store/test_and_set_command.go b/store/test_and_set_command.go index cf4167d1cdd..3370fed154e 100644 --- a/store/test_and_set_command.go +++ b/store/test_and_set_command.go @@ -8,7 +8,7 @@ import ( ) func init() { - raft.RegisterCommand(&TestAndSetCommand{}) + raft.RegisterCommand(&TestAndSetCommand{}) } // The TestAndSetCommand performs a conditional update on a key in the store. diff --git a/store/ttl.go b/store/ttl.go index c73d95f8c4e..fec0ec3bfb1 100644 --- a/store/ttl.go +++ b/store/ttl.go @@ -1,21 +1,20 @@ package store import ( - "strconv" - "time" + "strconv" + "time" ) // Convert string duration to time format func TTL(duration string) (time.Time, error) { - if duration != "" { - duration, err := strconv.Atoi(duration) - if err != nil { - return Permanent, err - } - return time.Now().Add(time.Second * (time.Duration)(duration)), nil + if duration != "" { + duration, err := strconv.Atoi(duration) + if err != nil { + return Permanent, err + } + return time.Now().Add(time.Second * (time.Duration)(duration)), nil - } else { - return Permanent, nil - } + } else { + return Permanent, nil + } } - diff --git a/store/update_command.go b/store/update_command.go index 694be9844f3..9ffd6c8686d 100644 --- a/store/update_command.go +++ b/store/update_command.go @@ -8,7 +8,7 @@ import ( ) func init() { - raft.RegisterCommand(&UpdateCommand{}) + raft.RegisterCommand(&UpdateCommand{}) } // The UpdateCommand updates the value of a key in the Store. diff --git a/util.go b/util.go index d519fc1aaca..089eba40cc2 100644 --- a/util.go +++ b/util.go @@ -14,7 +14,6 @@ import ( // HTTP Utilities //-------------------------------------- - // sanitizeURL will cleanup a host string in the format hostname:port and // attach a schema. func sanitizeURL(host string, defaultScheme string) string { @@ -87,4 +86,3 @@ func runCPUProfile() { } }() } -