Skip to content

Commit

Permalink
Merge pull request #63 from benbjohnson/http-transporter
Browse files Browse the repository at this point in the history
Default HTTP Transporter
  • Loading branch information
benbjohnson committed Jul 9, 2013
2 parents 1c4b918 + 67d11f7 commit 7cfb6f5
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 13 deletions.
183 changes: 183 additions & 0 deletions http_transporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package raft

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)

// Parts from this transporter were heavily influenced by Peter Bougon's
// raft implementation: https://github.com/peterbourgon/raft

//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------

// An HTTPTransporter is a default transport layer used to communicate between
// multiple servers.
type HTTPTransporter struct {
DisableKeepAlives bool
prefix string
appendEntriesPath string
requestVotePath string
}

type HTTPMuxer interface {
HandleFunc(string, func(http.ResponseWriter, *http.Request))
}

//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------

// Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter {
return &HTTPTransporter{
prefix: prefix,
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
}
}

//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------

// Retrieves the path prefix used by the transporter.
func (t *HTTPTransporter) Prefix() string {
return t.prefix
}

// Retrieves the AppendEntries path.
func (t *HTTPTransporter) AppendEntriesPath() string {
return t.appendEntriesPath
}

// Retrieves the RequestVote path.
func (t *HTTPTransporter) RequestVotePath() string {
return t.requestVotePath
}

//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------

//--------------------------------------
// Installation
//--------------------------------------

// Applies Raft routes to an HTTP router for a given server.
func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
}

//--------------------------------------
// Outgoing
//--------------------------------------

// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)

url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
traceln(server.Name(), "POST", url)

client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
httpResp, err := client.Post(url, "application/json", &b)
if httpResp == nil || err != nil {
return nil
}
defer httpResp.Body.Close()

resp := &AppendEntriesResponse{}
if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil && err != io.EOF {
return nil
}

return resp
}

// Sends a RequestVote RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)

url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
traceln(server.Name(), "POST", url)

client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
httpResp, err := client.Post(url, "application/json", &b)
if httpResp == nil || err != nil {
return nil
}
defer httpResp.Body.Close()

resp := &RequestVoteResponse{}
if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil && err != io.EOF {
return nil
}

return resp
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
// TODO
return nil
}

//--------------------------------------
// Incoming
//--------------------------------------

// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /appendEntries")

defer r.Body.Close()
req := &AppendEntriesRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}

resp := server.AppendEntries(req)
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}

// Handles incoming RequestVote requests.
func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /requestVote")

defer r.Body.Close()
req := &RequestVoteRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}

resp := server.RequestVote(req)
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}
111 changes: 111 additions & 0 deletions http_transporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package raft

import (
"fmt"
"net"
"net/http"
"sync"
"testing"
"time"
)

//------------------------------------------------------------------------------
//
// Tests
//
//------------------------------------------------------------------------------

//--------------------------------------
// Membership
//--------------------------------------

// Ensure that we can start several servers and have them communicate.
func TestHTTPTransporter(t *testing.T) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true

servers := []*Server{}
f0 := func(server *Server, httpServer *http.Server) {
// Stop the leader and wait for an election.
server.Stop()
time.Sleep(testElectionTimeout * 2)

if servers[1].State() != Leader && servers[2].State() != Leader {
t.Fatal("Expected re-election:", servers[1].State(), servers[2].State())
}
server.Initialize()
server.StartFollower()
}
f1 := func(server *Server, httpServer *http.Server) {
}
f2 := func(server *Server, httpServer *http.Server) {
}
runTestHttpServers(t, &servers, transporter, f0, f1, f2)
}

//------------------------------------------------------------------------------
//
// Helper Functions
//
//------------------------------------------------------------------------------

// Starts multiple independent Raft servers wrapped with HTTP servers.
func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
var wg sync.WaitGroup
httpServers := []*http.Server{}
listeners := []net.Listener{}
for i, _ := range callbacks {
wg.Add(1)
port := 9000 + i

// Create raft server.
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.SetElectionTimeout(testElectionTimeout)
server.Initialize()
if i == 0 {
server.StartLeader()
} else {
server.StartFollower()
}
defer server.Stop()
*servers = append(*servers, server)

// Create listener for HTTP server and start it.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
defer listener.Close()
listeners = append(listeners, listener)

// Create wrapping HTTP server.
mux := http.NewServeMux()
transporter.Install(server, mux)
httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
httpServers = append(httpServers, httpServer)
go func() { httpServer.Serve(listener) }()
}

// Setup configuration.
for _, server := range *servers {
if _, err := (*servers)[0].Do(&joinCommand{Name: server.Name()}); err != nil {
t.Fatal("Server unable to join: %v", err)
}
}

// Wait for configuration to propagate.
time.Sleep(testHeartbeatTimeout * 2)

// Execute all the callbacks at the same time.
for _i, _f := range callbacks {
i, f := _i, _f
go func() {
defer wg.Done()
f((*servers)[i], httpServers[i])
}()
}

// Wait until everything is done.
wg.Wait()
}
10 changes: 7 additions & 3 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (p *Peer) stopHeartbeat() {
// I make the channel with 1 buffer
// and try to panic here
select {
case p.stopChan <- true:
case p.stopChan <- true:

default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
}
}

Expand Down Expand Up @@ -128,12 +128,16 @@ func (p *Peer) heartbeat(c chan bool) {

c <- true

debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)

for {
select {
case <-stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return

case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex)

Expand Down
18 changes: 9 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (s *Server) followerLoop() {
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
if update {
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
}

// Exit loop on state change.
Expand Down Expand Up @@ -568,8 +568,6 @@ func (s *Server) leaderLoop() {
var err error
select {
case e := <-s.c:
s.debugln("server.leader.select")

if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(Command); ok {
Expand Down Expand Up @@ -654,6 +652,8 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse

// Processes the "append entries" request.
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
s.traceln("server.ae.process")

if req.Term < s.currentTerm {
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), false
Expand Down Expand Up @@ -786,18 +786,17 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
// Membership
//--------------------------------------

// Adds a peer to the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
// Do not allow peers to be added twice.
s.debugln("server.peer.add: ", name, len(s.peers))

// Do not allow peers to be added twice.
if s.peers[name] != nil {
return DuplicatePeerError
}

// Only add the peer if it doesn't have the same name.
if s.name != name {
//s.debugln("Add peer ", name)
peer := newPeer(s, name, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
Expand All @@ -808,9 +807,10 @@ func (s *Server) AddPeer(name string) error {
return nil
}

// Removes a peer from the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
// Removes a peer from the server.
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))

// Ignore removal of the server itself.
if s.name == name {
return nil
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func TestServerMultiNode(t *testing.T) {
}
mutex.RUnlock()

for i := 0; i < 200000; i++ {
for i := 0; i < 20; i++ {
retry := 0
fmt.Println("Round ", i)

Expand Down

0 comments on commit 7cfb6f5

Please sign in to comment.