diff --git a/api/apihttp/apihttp.go b/api/apihttp/apihttp.go index 5f0df6d07..1bf6c5657 100644 --- a/api/apihttp/apihttp.go +++ b/api/apihttp/apihttp.go @@ -370,7 +370,10 @@ func InfoHandle(httpEndpoint string, balloon raftwal.RaftBalloonApi) http.Handle w.WriteHeader(http.StatusMethodNotAllowed) return } - out, err := json.Marshal(httpEndpoint) + + info := balloon.Info() + info["httpEndpoint"] = "http://" + httpEndpoint + out, err := json.Marshal(info) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/client/client.go b/client/client.go index 1e9e11dd3..b0b3f3774 100644 --- a/client/client.go +++ b/client/client.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "net" "net/http" "net/url" @@ -92,6 +93,17 @@ func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, erro } } +func (c *HTTPClient) updateClusterLeader() { + info, _ := c.GetClusterInfo() + if info["isLeader"].(bool) { + c.conf.Cluster.Leader = info["httpEndpoint"].(string) + } else { + time.Sleep(100 * time.Millisecond) + c.conf.Cluster.Leader = c.conf.Cluster.Endpoints[rand.Int()%len(c.conf.Cluster.Endpoints)] + c.updateClusterLeader() + } +} + func (c HTTPClient) GetClusterInfo() (map[string]interface{}, error) { info := make(map[string]interface{}) @@ -119,13 +131,6 @@ func (c HTTPClient) GetClusterInfo() (map[string]interface{}, error) { return info, err } -func (c *HTTPClient) updateClusterLeader() { - info, _ := c.GetClusterInfo() - if val, ok := info["LeaderAddr"]; ok { - c.conf.Cluster.Leader = val.(string) - } -} - func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) { url, err := url.Parse(c.conf.Cluster.Leader + path) if err != nil { diff --git a/raftwal/raft.go b/raftwal/raft.go index 3d2958670..00608369c 100644 --- a/raftwal/raft.go +++ b/raftwal/raft.go @@ -191,49 +191,6 @@ func (b *RaftBalloon) Open(bootstrap bool) error { return nil } -// Join joins a node, identified by id and located at addr, to this store. -// The node must be ready to respond to Raft communications at that address. -// This must be called from the Leader or it will fail. -func (b *RaftBalloon) Join(nodeID, addr string) error { - - log.Infof("received join request for remote node %s at %s", nodeID, addr) - - configFuture := b.raft.api.GetConfiguration() - if err := configFuture.Error(); err != nil { - log.Errorf("failed to get raft servers configuration: %v", err) - return err - } - - for _, srv := range configFuture.Configuration().Servers { - // If a node already exists with either the joining node's ID or address, - // that node may need to be removed from the config first. - if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) { - // However if *both* the ID and the address are the same, then nothing -- not even - // a join operation -- is needed. - if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) { - log.Infof("node %s at %s already member of cluster, ignoring join request", nodeID, addr) - return nil - } - - future := b.raft.api.RemoveServer(srv.ID, 0, 0) - if err := future.Error(); err != nil { - return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) - } - } - } - - f := b.raft.api.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0) - if e := f.(raft.Future); e.Error() != nil { - if e.Error() == raft.ErrNotLeader { - return ErrNotLeader - } - return e.Error() - } - - log.Infof("node %s at %s joined successfully", nodeID, addr) - return nil -} - // Close closes the RaftBalloon. If wait is true, waits for a graceful shutdown. // Once closed, a RaftBalloon may not be re-opened. func (b *RaftBalloon) Close(wait bool) error { @@ -319,14 +276,6 @@ func (b *RaftBalloon) ID() string { return b.id } -// TODO Improve info structure. -// Info returns the Raft leader address. -func (b *RaftBalloon) Info() map[string]interface{} { - m := make(map[string]interface{}) - m["LeaderAddr"] = b.addr // LeaderAddr() - return m -} - // LeaderID returns the node ID of the Raft leader. Returns a // blank string if there is no leader, or an error. func (b *RaftBalloon) LeaderID() (string, error) { @@ -353,7 +302,6 @@ func (b *RaftBalloon) Nodes() ([]raft.Server, error) { } return f.Configuration().Servers, nil - } // Remove removes a node from the store, specified by ID. @@ -425,3 +373,54 @@ func (b *RaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.Me func (b *RaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) { return b.fsm.QueryConsistency(start, end) } + +// Join joins a node, identified by id and located at addr, to this store. +// The node must be ready to respond to Raft communications at that address. +// This must be called from the Leader or it will fail. +func (b *RaftBalloon) Join(nodeID, addr string) error { + + log.Infof("received join request for remote node %s at %s", nodeID, addr) + + configFuture := b.raft.api.GetConfiguration() + if err := configFuture.Error(); err != nil { + log.Errorf("failed to get raft servers configuration: %v", err) + return err + } + + for _, srv := range configFuture.Configuration().Servers { + // If a node already exists with either the joining node's ID or address, + // that node may need to be removed from the config first. + if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) { + // However if *both* the ID and the address are the same, then nothing -- not even + // a join operation -- is needed. + if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) { + log.Infof("node %s at %s already member of cluster, ignoring join request", nodeID, addr) + return nil + } + + future := b.raft.api.RemoveServer(srv.ID, 0, 0) + if err := future.Error(); err != nil { + return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) + } + } + } + + f := b.raft.api.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0) + if e := f.(raft.Future); e.Error() != nil { + if e.Error() == raft.ErrNotLeader { + return ErrNotLeader + } + return e.Error() + } + + log.Infof("node %s at %s joined successfully", nodeID, addr) + return nil +} + +// TODO Improve info structure. +// Info returns the Raft leader address. +func (b *RaftBalloon) Info() map[string]interface{} { + m := make(map[string]interface{}) + m["isLeader"] = b.IsLeader() + return m +} diff --git a/tests/e2e/cli_test.go b/tests/e2e/cli_test.go index c4ff4e486..5917717e2 100644 --- a/tests/e2e/cli_test.go +++ b/tests/e2e/cli_test.go @@ -21,6 +21,7 @@ import ( "os/exec" "strings" "testing" + "time" "github.com/bbva/qed/testutils/scope" @@ -52,7 +53,7 @@ func TestCli(t *testing.T) { assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd) }) - let("verify event with eventDigest", func(t *testing.T) { + let("Verify event with eventDigest", func(t *testing.T) { cmd := exec.Command("go", "run", "./../../main.go", @@ -75,7 +76,8 @@ func TestCli(t *testing.T) { assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest") }) - let("verify event with eventDigest", func(t *testing.T) { + let("Verify event with eventDigest", func(t *testing.T) { + cmd := exec.Command("go", "run", "./../../main.go", @@ -107,9 +109,7 @@ func TestCluster(t *testing.T) { before2, after2 := setupServer(2, "", t) serversHttpAddr := "http://127.0.0.1:8080,http://127.0.0.1:8081,http://127.0.0.1:8082" - // before3, after3 := setupServer(0, "", t) - - scenario, let := scope.Scope(t, merge(before0, before1, before2), merge(after0, after1, after2)) + scenario, let := scope.Scope(t, merge(before0, before1, before2), merge(after1, after2)) scenario("Add one event through cli and verify it", func() { let("Add event", func(t *testing.T) { @@ -129,51 +129,76 @@ func TestCluster(t *testing.T) { assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd) }) - }) - let("verify event with eventDigest", func(t *testing.T) { - cmd := exec.Command("go", - "run", - "./../../main.go", - fmt.Sprintf("--apikey=%s", APIKey), - "client", - fmt.Sprintf("--endpoints=%s", QEDUrl), - "membership", - "--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a", - "--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00", - "--version=0", - "--eventDigest=8694718de4363adf07ec3b4aff4c76589f60fe89a7715bee7c8b250e06493922", - "--log=info", - "--verify", - ) - - stdoutStderr, err := cmd.CombinedOutput() - - assert.NoError(t, err, "Subprocess must not exit with status 1") - assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest") - }) + let("Verify event with eventDigest", func(t *testing.T) { + cmd := exec.Command("go", + "run", + "./../../main.go", + fmt.Sprintf("--apikey=%s", APIKey), + "client", + fmt.Sprintf("--endpoints=%s", serversHttpAddr), + "membership", + "--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a", + "--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00", + "--version=0", + "--eventDigest=8694718de4363adf07ec3b4aff4c76589f60fe89a7715bee7c8b250e06493922", + "--log=info", + "--verify", + ) + + stdoutStderr, err := cmd.CombinedOutput() + + assert.NoError(t, err, "Subprocess must not exit with status 1") + assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest") + }) + + let("Verify event with eventDigest", func(t *testing.T) { + + cmd := exec.Command("go", + "run", + "./../../main.go", + fmt.Sprintf("--apikey=%s", APIKey), + "client", + fmt.Sprintf("--endpoints=%s", serversHttpAddr), + "membership", + "--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a", + "--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00", + "--version=0", + "--key='test event'", + "--log=info", + "--verify", + ) + + stdoutStderr, err := cmd.CombinedOutput() + + assert.NoError(t, err, "Subprocess must not exit with status 1") + assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest") + + }) + + let("Kill server 0", func(t *testing.T) { + after0(t) + serversHttpAddr = "http://127.0.0.1:8081,http://127.0.0.1:8082" + time.Sleep(2 * time.Second) + }) + + let("Add second event", func(t *testing.T) { + cmd := exec.Command("go", + "run", + "./../../main.go", + fmt.Sprintf("--apikey=%s", APIKey), + "client", + fmt.Sprintf("--endpoints=%s", serversHttpAddr), + "add", + "--key='test event 2'", + "--value=2", + "--log=info", + ) - let("verify event with eventDigest", func(t *testing.T) { - - cmd := exec.Command("go", - "run", - "./../../main.go", - fmt.Sprintf("--apikey=%s", APIKey), - "client", - fmt.Sprintf("--endpoints=%s", QEDUrl), - "membership", - "--hyperDigest=81ae2d8f6ecec9c5837d12a09e3b42a1c880b6c77f81ff1f85aef36dac4fdf6a", - "--historyDigest=0f5129eaf5dbfb1405ff072a04d716aaf4e4ba4247a3322c41582e970dbb7b00", - "--version=0", - "--key='test event'", - "--log=info", - "--verify", - ) - - stdoutStderr, err := cmd.CombinedOutput() - - assert.NoError(t, err, "Subprocess must not exit with status 1") - assert.True(t, strings.Contains(fmt.Sprintf("%s", stdoutStderr), "Verify: OK"), "Must verify with eventDigest") + _, err := cmd.CombinedOutput() + + assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd) + }) }) }