Skip to content

Commit

Permalink
Fix test cluster
Browse files Browse the repository at this point in the history
Improve Info endpoint.
Fix server-0 shutdown discipline.

Co-authored-by: pancho horrillo <[email protected]>
  • Loading branch information
2 people authored and iknite committed Feb 21, 2019
1 parent acf201b commit 75b5df1
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 108 deletions.
5 changes: 4 additions & 1 deletion api/apihttp/apihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,10 @@ func InfoHandle(httpEndpoint string, balloon raftwal.RaftBalloonApi) http.Handle
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
out, err := json.Marshal(httpEndpoint)

info := balloon.Info()
info["httpEndpoint"] = "http://" + httpEndpoint
out, err := json.Marshal(info)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
19 changes: 12 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -92,6 +93,17 @@ func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, erro
}
}

func (c *HTTPClient) updateClusterLeader() {
info, _ := c.GetClusterInfo()
if info["isLeader"].(bool) {
c.conf.Cluster.Leader = info["httpEndpoint"].(string)
} else {
time.Sleep(100 * time.Millisecond)
c.conf.Cluster.Leader = c.conf.Cluster.Endpoints[rand.Int()%len(c.conf.Cluster.Endpoints)]
c.updateClusterLeader()
}
}

func (c HTTPClient) GetClusterInfo() (map[string]interface{}, error) {
info := make(map[string]interface{})

Expand Down Expand Up @@ -119,13 +131,6 @@ func (c HTTPClient) GetClusterInfo() (map[string]interface{}, error) {
return info, err
}

func (c *HTTPClient) updateClusterLeader() {
info, _ := c.GetClusterInfo()
if val, ok := info["LeaderAddr"]; ok {
c.conf.Cluster.Leader = val.(string)
}
}

func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
url, err := url.Parse(c.conf.Cluster.Leader + path)
if err != nil {
Expand Down
103 changes: 51 additions & 52 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,49 +191,6 @@ func (b *RaftBalloon) Open(bootstrap bool) error {
return nil
}

// Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address.
// This must be called from the Leader or it will fail.
func (b *RaftBalloon) Join(nodeID, addr string) error {

log.Infof("received join request for remote node %s at %s", nodeID, addr)

configFuture := b.raft.api.GetConfiguration()
if err := configFuture.Error(); err != nil {
log.Errorf("failed to get raft servers configuration: %v", err)
return err
}

for _, srv := range configFuture.Configuration().Servers {
// If a node already exists with either the joining node's ID or address,
// that node may need to be removed from the config first.
if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) {
// However if *both* the ID and the address are the same, then nothing -- not even
// a join operation -- is needed.
if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) {
log.Infof("node %s at %s already member of cluster, ignoring join request", nodeID, addr)
return nil
}

future := b.raft.api.RemoveServer(srv.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err)
}
}
}

f := b.raft.api.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return e.Error()
}

log.Infof("node %s at %s joined successfully", nodeID, addr)
return nil
}

// Close closes the RaftBalloon. If wait is true, waits for a graceful shutdown.
// Once closed, a RaftBalloon may not be re-opened.
func (b *RaftBalloon) Close(wait bool) error {
Expand Down Expand Up @@ -319,14 +276,6 @@ func (b *RaftBalloon) ID() string {
return b.id
}

// TODO Improve info structure.
// Info returns the Raft leader address.
func (b *RaftBalloon) Info() map[string]interface{} {
m := make(map[string]interface{})
m["LeaderAddr"] = b.addr // LeaderAddr()
return m
}

// LeaderID returns the node ID of the Raft leader. Returns a
// blank string if there is no leader, or an error.
func (b *RaftBalloon) LeaderID() (string, error) {
Expand All @@ -353,7 +302,6 @@ func (b *RaftBalloon) Nodes() ([]raft.Server, error) {
}

return f.Configuration().Servers, nil

}

// Remove removes a node from the store, specified by ID.
Expand Down Expand Up @@ -425,3 +373,54 @@ func (b *RaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.Me
func (b *RaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) {
return b.fsm.QueryConsistency(start, end)
}

// Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address.
// This must be called from the Leader or it will fail.
func (b *RaftBalloon) Join(nodeID, addr string) error {

log.Infof("received join request for remote node %s at %s", nodeID, addr)

configFuture := b.raft.api.GetConfiguration()
if err := configFuture.Error(); err != nil {
log.Errorf("failed to get raft servers configuration: %v", err)
return err
}

for _, srv := range configFuture.Configuration().Servers {
// If a node already exists with either the joining node's ID or address,
// that node may need to be removed from the config first.
if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) {
// However if *both* the ID and the address are the same, then nothing -- not even
// a join operation -- is needed.
if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) {
log.Infof("node %s at %s already member of cluster, ignoring join request", nodeID, addr)
return nil
}

future := b.raft.api.RemoveServer(srv.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err)
}
}
}

f := b.raft.api.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return e.Error()
}

log.Infof("node %s at %s joined successfully", nodeID, addr)
return nil
}

// TODO Improve info structure.
// Info returns the Raft leader address.
func (b *RaftBalloon) Info() map[string]interface{} {
m := make(map[string]interface{})
m["isLeader"] = b.IsLeader()
return m
}
121 changes: 73 additions & 48 deletions tests/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os/exec"
"strings"
"testing"
"time"

"github.com/bbva/qed/testutils/scope"

Expand Down Expand Up @@ -52,7 +53,7 @@ func TestCli(t *testing.T) {
assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd)
})

let("verify event with eventDigest", func(t *testing.T) {
let("Verify event with eventDigest", func(t *testing.T) {
cmd := exec.Command("go",
"run",
"./../../main.go",
Expand All @@ -75,7 +76,8 @@ func TestCli(t *testing.T) {
assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest")
})

let("verify event with eventDigest", func(t *testing.T) {
let("Verify event with eventDigest", func(t *testing.T) {

cmd := exec.Command("go",
"run",
"./../../main.go",
Expand Down Expand Up @@ -107,9 +109,7 @@ func TestCluster(t *testing.T) {
before2, after2 := setupServer(2, "", t)
serversHttpAddr := "http://127.0.0.1:8080,http://127.0.0.1:8081,http://127.0.0.1:8082"

// before3, after3 := setupServer(0, "", t)

scenario, let := scope.Scope(t, merge(before0, before1, before2), merge(after0, after1, after2))
scenario, let := scope.Scope(t, merge(before0, before1, before2), merge(after1, after2))

scenario("Add one event through cli and verify it", func() {
let("Add event", func(t *testing.T) {
Expand All @@ -129,51 +129,76 @@ func TestCluster(t *testing.T) {

assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd)
})
})

let("verify event with eventDigest", func(t *testing.T) {
cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", QEDUrl),
"membership",
"--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a",
"--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00",
"--version=0",
"--eventDigest=8694718de4363adf07ec3b4aff4c76589f60fe89a7715bee7c8b250e06493922",
"--log=info",
"--verify",
)

stdoutStderr, err := cmd.CombinedOutput()

assert.NoError(t, err, "Subprocess must not exit with status 1")
assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest")
})
let("Verify event with eventDigest", func(t *testing.T) {
cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", serversHttpAddr),
"membership",
"--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a",
"--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00",
"--version=0",
"--eventDigest=8694718de4363adf07ec3b4aff4c76589f60fe89a7715bee7c8b250e06493922",
"--log=info",
"--verify",
)

stdoutStderr, err := cmd.CombinedOutput()

assert.NoError(t, err, "Subprocess must not exit with status 1")
assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest")
})

let("Verify event with eventDigest", func(t *testing.T) {

cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", serversHttpAddr),
"membership",
"--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a",
"--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00",
"--version=0",
"--key='test event'",
"--log=info",
"--verify",
)

stdoutStderr, err := cmd.CombinedOutput()

assert.NoError(t, err, "Subprocess must not exit with status 1")
assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest")

})

let("Kill server 0", func(t *testing.T) {
after0(t)
serversHttpAddr = "http://127.0.0.1:8081,http://127.0.0.1:8082"
time.Sleep(2 * time.Second)
})

let("Add second event", func(t *testing.T) {
cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", serversHttpAddr),
"add",
"--key='test event 2'",
"--value=2",
"--log=info",
)

let("verify event with eventDigest", func(t *testing.T) {

cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", QEDUrl),
"membership",
"--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a",
"--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00",
"--version=0",
"--key='test event'",
"--log=info",
"--verify",
)

stdoutStderr, err := cmd.CombinedOutput()

assert.NoError(t, err, "Subprocess must not exit with status 1")
assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest")
_, err := cmd.CombinedOutput()

assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd)
})

})
}

0 comments on commit 75b5df1

Please sign in to comment.