Skip to content

Commit

Permalink
Client can now connect to qed cluster-leader being topology-aware.
Browse files Browse the repository at this point in the history
Metadata broadcast via raft.
Some refactor.
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 21, 2019
1 parent a56e376 commit 78868e6
Show file tree
Hide file tree
Showing 14 changed files with 377 additions and 53 deletions.
4 changes: 2 additions & 2 deletions api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (b fakeRaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
return &balloon.Snapshot{hashing.Digest{0x02}, hashing.Digest{0x00}, hashing.Digest{0x01}, 0}, nil
}

func (b fakeRaftBalloon) Join(nodeID, addr string) error {
func (b fakeRaftBalloon) Join(nodeID, addr string, metadata map[string]string) error {
return nil
}

Expand Down Expand Up @@ -389,7 +389,7 @@ func BenchmarkApiAdd(b *testing.B) {
r, clean := newNodeBench(b, 1)
defer clean()

err := r.Open(true)
err := r.Open(true, map[string]string{"foo": "bar"})
assert.NoError(b, err)

handler := Add(r)
Expand Down
24 changes: 18 additions & 6 deletions api/mgmthttp/mgmthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,46 @@ func NewMgmtHttp(raftBalloon raftwal.RaftBalloonApi) *http.ServeMux {

func joinHandle(raftBalloon raftwal.RaftBalloonApi) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
m := map[string]string{}
body := make(map[string]interface{})

if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

if len(m) != 2 {
if len(body) != 3 {
w.WriteHeader(http.StatusBadRequest)
return
}

remoteAddr, ok := m["addr"]
remoteAddr, ok := body["addr"].(string)
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}

nodeID, ok := m["id"]
nodeID, ok := body["id"].(string)
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}

if err := raftBalloon.Join(nodeID, remoteAddr); err != nil {
m, ok := body["metadata"].(map[string]interface{})
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
// TO IMPROVE: use map[string]interface{} for nested metadata.
metadata := make(map[string]string)
for k, v := range m {
metadata[k] = v.(string)
}

if err := raftBalloon.Join(nodeID, remoteAddr, metadata); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}
}
29 changes: 29 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c HTTPClient) getClusterInfo() (map[string]interface{}, error) {
if err != nil {
return info, err
}

err = json.Unmarshal(body, &info)
if err != nil {
return info, err
Expand All @@ -133,6 +134,34 @@ func (c HTTPClient) getClusterInfo() (map[string]interface{}, error) {
return info, err
}

func (c *HTTPClient) updateConf(info map[string]interface{}) {

clusterMeta := info["meta"].(map[string]interface{})
leaderID := info["leaderID"].(string)
scheme := info["URIScheme"].(string)

var leaderAddr string
var endpoints []string

leaderMeta := clusterMeta[leaderID].(map[string]interface{})
for k, addr := range leaderMeta {
if k == "HTTPAddr" {
leaderAddr = scheme + addr.(string)
}
}
c.conf.Cluster.Leader = leaderAddr

for _, nodeMeta := range clusterMeta {
for k, address := range nodeMeta.(map[string]interface{}) {
if k == "HTTPAddr" {
url := scheme + address.(string)
endpoints = append(endpoints, url)
}
}
}
c.conf.Cluster.Endpoints = endpoints
}

func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
url, err := url.Parse(c.conf.Cluster.Leader + path)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/bbva/qed/balloon/visitor"
Expand All @@ -42,6 +43,9 @@ func init() {
func setup() func() {
mux = http.NewServeMux()
server = httptest.NewServer(mux)

mux.HandleFunc("/info/shards", infoHandler(server.URL))

client = NewHTTPClient(Config{
Cluster: QEDCluster{Endpoints: []string{server.URL}, Leader: server.URL},
APIKey: "my-awesome-api-key",
Expand Down Expand Up @@ -193,3 +197,23 @@ func serverErrorHandler() func(http.ResponseWriter, *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
}

func infoHandler(serverURL string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

var md = make(map[string]interface{})
md["nodeID"] = "node01"
md["leaderID"] = "node01"
md["URIScheme"] = "http://"
md["meta"] = map[string]map[string]string{
"node01": map[string]string{
"HTTPAddr": strings.Trim(serverURL, "http://"),
},
}

out, _ := json.Marshal(md)
_, _ = w.Write(out)
}
}
8 changes: 7 additions & 1 deletion raftwal/commands/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ type CommandType uint8

const (
AddEventCommandType CommandType = 0 // Commands which modify the database.
MetadataDeleteCommandType CommandType = 1
MetadataSetCommandType CommandType = 1
MetadataDeleteCommandType CommandType = 2
)

type AddEventCommand struct {
Event []byte
}

type MetadataSetCommand struct {
Id string
Data map[string]string
}

type MetadataDeleteCommand struct {
Id string
}
Expand Down
108 changes: 107 additions & 1 deletion raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package raftwal

import (
"bytes"
// "encoding/binary"
"encoding/json"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -51,6 +53,9 @@ type BalloonFSM struct {

agentsQueue chan *protocol.Snapshot

metaMu sync.RWMutex
meta map[string]map[string]string

restoreMu sync.RWMutex // Restore needs exclusive access to database.
}

Expand Down Expand Up @@ -87,6 +92,7 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, ag
balloon: b,
state: state,
agentsQueue: agentsQueue,
meta: make(map[string]map[string]string),
}, nil
}

Expand Down Expand Up @@ -139,6 +145,36 @@ func (fsm *BalloonFSM) Apply(l *raft.Log) interface{} {
return fsm.applyAdd(cmd.Event, newState)
}
return &fsmAddResponse{error: fmt.Errorf("state already applied!: %+v -> %+v", fsm.state, newState)}

case commands.MetadataSetCommandType:
var cmd commands.MetadataSetCommand
if err := commands.Decode(buf[1:], &cmd); err != nil {
return &fsmGenericResponse{error: err}
}

fsm.metaMu.Lock()
defer fsm.metaMu.Unlock()
if _, ok := fsm.meta[cmd.Id]; !ok {
fsm.meta[cmd.Id] = make(map[string]string)
}
for k, v := range cmd.Data {
fsm.meta[cmd.Id][k] = v
}

return &fsmGenericResponse{}

case commands.MetadataDeleteCommandType:
var cmd commands.MetadataDeleteCommand
if err := commands.Decode(buf[1:], &cmd); err != nil {
return &fsmGenericResponse{error: err}
}

fsm.metaMu.Lock()
defer fsm.metaMu.Unlock()
delete(fsm.meta, cmd.Id)

return &fsmGenericResponse{}

default:
return &fsmGenericResponse{error: fmt.Errorf("unknown command: %v", cmdType)}

Expand All @@ -151,12 +187,21 @@ func (fsm *BalloonFSM) Apply(l *raft.Log) interface{} {
func (fsm *BalloonFSM) Snapshot() (raft.FSMSnapshot, error) {
fsm.restoreMu.Lock()
defer fsm.restoreMu.Unlock()

version, err := fsm.store.GetLastVersion()
if err != nil {
return nil, err
}
log.Debugf("Generating snapshot until version: %d (balloon version %d)", version, fsm.balloon.Version())
return &fsmSnapshot{lastVersion: version, store: fsm.store}, nil

// Copy the node metadata.
meta, err := json.Marshal(fsm.meta)
if err != nil {
log.Debugf("failed to encode meta for snapshot: %s", err.Error())
return nil, err
}

return &fsmSnapshot{lastVersion: version, store: fsm.store, meta: meta}, nil
}

// Restore restores the node to a previous state.
Expand All @@ -170,6 +215,26 @@ func (fsm *BalloonFSM) Restore(rc io.ReadCloser) error {
if err = fsm.store.Load(rc); err != nil {
return err
}

// TODO: Restore metadata??

// log.Debug("Restoring Metadata...")
// var sz uint64

// // Get size of meta, read those bytes, and set to meta.
// if err := binary.Read(rc, binary.LittleEndian, &sz); err != nil {
// return err
// }
// meta := make([]byte, sz)
// if _, err := io.ReadFull(rc, meta); err != nil {
// return err
// }
// err = func() error {
// fsm.metaMu.Lock()
// defer fsm.metaMu.Unlock()
// return json.Unmarshal(meta, &fsm.meta)
// }()

return fsm.balloon.RefreshVersion()
}

Expand Down Expand Up @@ -223,3 +288,44 @@ func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
err := enc.Encode(in)
return buf, err
}

// Metadata returns the value for a given key, for a given node ID.
func (fsm *BalloonFSM) Metadata(id, key string) string {
fsm.metaMu.RLock()
defer fsm.metaMu.RUnlock()

if _, ok := fsm.meta[id]; !ok {
return ""
}
v, ok := fsm.meta[id][key]
if ok {
return v
}
return ""
}

// setMetadata adds the metadata md to any existing metadata for
// the given node ID.
func (fsm *BalloonFSM) setMetadata(id string, md map[string]string) *commands.MetadataSetCommand {
// Check local data first.
if func() bool {
fsm.metaMu.RLock()
defer fsm.metaMu.RUnlock()
if _, ok := fsm.meta[id]; ok {
for k, v := range md {
if fsm.meta[id][k] != v {
return false
}
}
return true
}
return false
}() {
// Local data is same as data being pushed in,
// nothing to do.
return nil
}
cmd := &commands.MetadataSetCommand{Id: id, Data: md}

return cmd
}
Loading

0 comments on commit 78868e6

Please sign in to comment.