Skip to content

Commit

Permalink
fix start as a follower(problem with election timeout) and join command
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jun 10, 2013
1 parent 4ff786b commit b9d789f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
6 changes: 3 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func (c *DeleteCommand) GetKey() string{
}

// joinCommand
type joinCommand struct {
type JoinCommand struct {
Name string `json:"name"`
}

func (c *joinCommand) CommandName() string {
func (c *JoinCommand) CommandName() string {
return "join"
}

func (c *joinCommand) Apply(server *raft.Server) ([]byte, error) {
func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) {
err := server.AddPeer(c.Name)
return nil, err
}
15 changes: 12 additions & 3 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {

func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] POST http://%v/join", server.Name())
command := &joinCommand{}
command := &JoinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
if _, err= server.Do(command); err != nil {
warn("raftd: Unable to join: %v", err)
Expand Down Expand Up @@ -58,6 +58,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
debug("My role is %s", server.State())
if resp, _ := server.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
Expand Down Expand Up @@ -134,10 +135,14 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
var body []byte
var err error


fmt.Println("dispatch")
// unlikely to fail twice
for {
// i am the leader, i will take care of the command
if server.State() == "leader" {
fmt.Println("i am leader ", server.Name())
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -157,6 +162,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
continue
}

fmt.Println("forward to ", leaderName)

path := command.GeneratePath()

if command.Type() == "POST" {
Expand All @@ -167,7 +174,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
reps, _ := http.Post(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()), "application/json", reader)

reps.Body.Read(body)
body, _ := ioutil.ReadAll(reps.Body)
fmt.Println(body)
// good to go
w.WriteHeader(http.StatusOK)

Expand All @@ -179,7 +187,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
reps, _ := http.Get(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()))
// good to go
reps.Body.Read(body)
body, _ := ioutil.ReadAll(reps.Body)
fmt.Println(body)

w.WriteHeader(http.StatusOK)

Expand Down
27 changes: 17 additions & 10 deletions raftd.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
}

// Setup commands.
raft.RegisterCommand(&joinCommand{})
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
Expand All @@ -94,23 +94,30 @@ func main() {
// Setup new raft server.
server, err = raft.NewServer(name, path, t, nil)
//server.DoHandler = DoHandler;
server.SetElectionTimeout(2 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
if err != nil {
fatal("%v", err)
}
server.Start()

server.Initialize()
fmt.Println("1 join as ", server.State(), " term ", server.Term())
// Join to another server if we don't have a log.
if server.IsLogEmpty() {
var leaderHost string
fmt.Println("2 join as ", server.State(), " term ", server.Term())
fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster.");
fmt.Printf("Join to (host:port)> ");
fmt.Scanf("%s", &leaderHost)
fmt.Println("3 join as ", server.State(), " term ", server.Term())
if leaderHost == "" {
server.Initialize()
fmt.Println("init")
server.SetElectionTimeout(1 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
server.StartLeader()
} else {
join(server)
server.SetElectionTimeout(1 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
server.StartFollower()
fmt.Println("4 join as ", server.State(), " term ", server.Term())
Join(server, leaderHost)
fmt.Println("success join")
}
}
Expand Down Expand Up @@ -191,14 +198,14 @@ func getInfo(path string) *Info {
//--------------------------------------

// Send join requests to the leader.
func join(s *raft.Server) error {
func Join(s *raft.Server, serverName string) error {
var b bytes.Buffer
command := &joinCommand{}
command := &JoinCommand{}
command.Name = s.Name()

json.NewEncoder(&b).Encode(command)
debug("[send] POST http://%v/join", "localhost:4001")
resp, err := http.Post(fmt.Sprintf("http://%s/join", "localhost:4001"), "application/json", &b)
resp, err := http.Post(fmt.Sprintf("http://%s/join", serverName), "application/json", &b)
if resp != nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
Expand Down

0 comments on commit b9d789f

Please sign in to comment.