From b9d789fb8421154b9f306526a9375265154e2296 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 9 Jun 2013 21:46:30 -0700 Subject: [PATCH] fix start as a follower(problem with election timeout) and join command --- command.go | 6 +++--- handlers.go | 15 ++++++++++++--- raftd.go | 27 +++++++++++++++++---------- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/command.go b/command.go index a1e6adb9322..e4399d2e801 100644 --- a/command.go +++ b/command.go @@ -121,15 +121,15 @@ func (c *DeleteCommand) GetKey() string{ } // joinCommand -type joinCommand struct { +type JoinCommand struct { Name string `json:"name"` } -func (c *joinCommand) CommandName() string { +func (c *JoinCommand) CommandName() string { return "join" } -func (c *joinCommand) Apply(server *raft.Server) ([]byte, error) { +func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) { err := server.AddPeer(c.Name) return nil, err } diff --git a/handlers.go b/handlers.go index 7dd3f4e7cc3..2eadb41c252 100644 --- a/handlers.go +++ b/handlers.go @@ -25,7 +25,7 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { debug("[recv] POST http://%v/join", server.Name()) - command := &joinCommand{} + command := &JoinCommand{} if err := decodeJsonRequest(req, command); err == nil { if _, err= server.Do(command); err != nil { warn("raftd: Unable to join: %v", err) @@ -58,6 +58,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries)) + debug("My role is %s", server.State()) if resp, _ := server.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -134,10 +135,14 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) { func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { var body []byte var err error + + + fmt.Println("dispatch") // unlikely to fail twice for { // i am the leader, i will take care of the command if server.State() == "leader" { + fmt.Println("i am leader ", server.Name()) if body, err = server.Do(command); err != nil { warn("raftd: Unable to write file: %v", err) w.WriteHeader(http.StatusInternalServerError) @@ -157,6 +162,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { continue } + fmt.Println("forward to ", leaderName) + path := command.GeneratePath() if command.Type() == "POST" { @@ -167,7 +174,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { reps, _ := http.Post(fmt.Sprintf("http://%v/%s", leaderName, command.GeneratePath()), "application/json", reader) - reps.Body.Read(body) + body, _ := ioutil.ReadAll(reps.Body) + fmt.Println(body) // good to go w.WriteHeader(http.StatusOK) @@ -179,7 +187,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { reps, _ := http.Get(fmt.Sprintf("http://%v/%s", leaderName, command.GeneratePath())) // good to go - reps.Body.Read(body) + body, _ := ioutil.ReadAll(reps.Body) + fmt.Println(body) w.WriteHeader(http.StatusOK) diff --git a/raftd.go b/raftd.go index a16d35bfdd8..c142b1b57b8 100644 --- a/raftd.go +++ b/raftd.go @@ -68,7 +68,7 @@ func main() { } // Setup commands. - raft.RegisterCommand(&joinCommand{}) + raft.RegisterCommand(&JoinCommand{}) raft.RegisterCommand(&SetCommand{}) raft.RegisterCommand(&GetCommand{}) raft.RegisterCommand(&DeleteCommand{}) @@ -94,23 +94,30 @@ func main() { // Setup new raft server. server, err = raft.NewServer(name, path, t, nil) //server.DoHandler = DoHandler; - server.SetElectionTimeout(2 * time.Second) - server.SetHeartbeatTimeout(1 * time.Second) if err != nil { fatal("%v", err) } - server.Start() - + server.Initialize() + fmt.Println("1 join as ", server.State(), " term ", server.Term()) // Join to another server if we don't have a log. if server.IsLogEmpty() { var leaderHost string + fmt.Println("2 join as ", server.State(), " term ", server.Term()) fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster."); fmt.Printf("Join to (host:port)> "); fmt.Scanf("%s", &leaderHost) + fmt.Println("3 join as ", server.State(), " term ", server.Term()) if leaderHost == "" { - server.Initialize() + fmt.Println("init") + server.SetElectionTimeout(1 * time.Second) + server.SetHeartbeatTimeout(1 * time.Second) + server.StartLeader() } else { - join(server) + server.SetElectionTimeout(1 * time.Second) + server.SetHeartbeatTimeout(1 * time.Second) + server.StartFollower() + fmt.Println("4 join as ", server.State(), " term ", server.Term()) + Join(server, leaderHost) fmt.Println("success join") } } @@ -191,14 +198,14 @@ func getInfo(path string) *Info { //-------------------------------------- // Send join requests to the leader. -func join(s *raft.Server) error { +func Join(s *raft.Server, serverName string) error { var b bytes.Buffer - command := &joinCommand{} + command := &JoinCommand{} command.Name = s.Name() json.NewEncoder(&b).Encode(command) debug("[send] POST http://%v/join", "localhost:4001") - resp, err := http.Post(fmt.Sprintf("http://%s/join", "localhost:4001"), "application/json", &b) + resp, err := http.Post(fmt.Sprintf("http://%s/join", serverName), "application/json", &b) if resp != nil { resp.Body.Close() if resp.StatusCode == http.StatusOK {