Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 21, 2019
1 parent 0a392b6 commit ef23483
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 59 deletions.
20 changes: 20 additions & 0 deletions api/apihttp/apihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func NewApiHttp(balloon raftwal.RaftBalloonApi) *http.ServeMux {
api.HandleFunc("/proofs/membership", AuthHandlerMiddleware(Membership(balloon)))
api.HandleFunc("/proofs/digest-membership", AuthHandlerMiddleware(DigestMembership(balloon)))
api.HandleFunc("/proofs/incremental", AuthHandlerMiddleware(Incremental(balloon)))
api.HandleFunc("/leader", AuthHandlerMiddleware(LeaderHandle(balloon)))

return api
}
Expand Down Expand Up @@ -360,3 +361,22 @@ func LogHandler(handle http.Handler) http.HandlerFunc {
}
}
}

func LeaderHandle(balloon raftwal.RaftBalloonApi) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.Header().Set("Allow", "GET")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

out, err := json.Marshal(balloon.LeaderAddr())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(out)
return
}
}
44 changes: 24 additions & 20 deletions api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,41 +53,45 @@ func (b fakeRaftBalloon) Join(nodeID, addr string) error {

func (b fakeRaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) {
return &balloon.MembershipProof{
true,
visitor.NewFakeVerifiable(true),
visitor.NewFakeVerifiable(true),
1,
1,
2,
keyDigest,
hashing.NewFakeXorHasher(),
Exists: true,
HyperProof: visitor.NewFakeVerifiable(true),
HistoryProof: visitor.NewFakeVerifiable(true),
CurrentVersion: 1,
QueryVersion: 1,
ActualVersion: 2,
KeyDigest: keyDigest,
Hasher: hashing.NewFakeXorHasher(),
}, nil
}

func (b fakeRaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) {
hasher := hashing.NewFakeXorHasher()
return &balloon.MembershipProof{
true,
visitor.NewFakeVerifiable(true),
visitor.NewFakeVerifiable(true),
1,
1,
2,
hasher.Do(event),
hasher,
Exists: true,
HyperProof: visitor.NewFakeVerifiable(true),
HistoryProof: visitor.NewFakeVerifiable(true),
CurrentVersion: 1,
QueryVersion: 1,
ActualVersion: 2,
KeyDigest: hasher.Do(event),
Hasher: hasher,
}, nil
}

func (b fakeRaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) {
ip := balloon.IncrementalProof{
2,
8,
visitor.AuditPath{"0|0": hashing.Digest{0x00}},
hashing.NewFakeXorHasher(),
Start: 2,
End: 8,
AuditPath: visitor.AuditPath{"0|0": hashing.Digest{0x00}},
Hasher: hashing.NewFakeXorHasher(),
}
return &ip, nil
}

func (b fakeRaftBalloon) LeaderAddr() string {
return b.raftBindAddr
}

func TestHealthCheckHandler(t *testing.T) {
// Create a request to pass to our handler. We don't have any query parameters for now, so we'll
// pass 'nil' as the third parameter.
Expand Down
84 changes: 55 additions & 29 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
// HTTPClient ist the stuct that has the required information for the cli.
type HTTPClient struct {
conf *Config

*http.Client
}

Expand All @@ -51,7 +50,7 @@ func NewHTTPClient(conf Config) *HTTPClient {
tlsConf = &tls.Config{}
}

return &HTTPClient{
client := &HTTPClient{
&conf,
&http.Client{
Timeout: time.Second * 10,
Expand All @@ -65,15 +64,23 @@ func NewHTTPClient(conf Config) *HTTPClient {
},
}

if len(conf.Cluster.Endpoints) == 1 {
conf.Cluster.Leader = conf.Cluster.Endpoints[0]
} else {
client.checkClusterLeader()
}

return client
}

func (c HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, error) {
func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, error) {

var retries uint

for {
resp, err := c.Do(req)
if err != nil {
// c.checkClusterLeader()
if retries == 5 {
return nil, err
}
Expand All @@ -87,8 +94,28 @@ func (c HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, error

}

func (c *HTTPClient) checkClusterLeader() {
var data []byte

url := c.conf.Cluster.Endpoints[0]
path := "/leader"
req, err := http.NewRequest("GET", url+path, bytes.NewBuffer(data))
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)
resp, err := c.Do(req)
if err != nil {
panic(err)
}

bodyBytes, _ := ioutil.ReadAll(resp.Body)
c.conf.Cluster.Leader = string(bodyBytes)
}

func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
url, err := url.Parse(c.conf.Endpoint + path)
url, err := url.Parse(c.conf.Cluster.Leader + path)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -118,7 +145,6 @@ func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
}

return bodyBytes, nil

}

// Ping will do a request to the server
Expand All @@ -132,28 +158,28 @@ func (c HTTPClient) Ping() error {
}

// Add will do a request to the server with a post data to store a new event.
func (c HTTPClient) Add(event string) (*protocol.Snapshot, error) {
func (c *HTTPClient) Add(event string) (*protocol.Snapshot, error) {

data, _ := json.Marshal(&protocol.Event{[]byte(event)})
data, _ := json.Marshal(&protocol.Event{Event: []byte(event)})

body, err := c.doReq("POST", "/events", data)
if err != nil {
return nil, err
}

var snapshot protocol.Snapshot
json.Unmarshal(body, &snapshot)
_ = json.Unmarshal(body, &snapshot)

return &snapshot, nil

}

// Membership will ask for a Proof to the server.
func (c HTTPClient) Membership(key []byte, version uint64) (*protocol.MembershipResult, error) {
func (c *HTTPClient) Membership(key []byte, version uint64) (*protocol.MembershipResult, error) {

query, _ := json.Marshal(&protocol.MembershipQuery{
key,
version,
Key: key,
Version: version,
})

body, err := c.doReq("POST", "/proofs/membership", query)
Expand All @@ -169,7 +195,7 @@ func (c HTTPClient) Membership(key []byte, version uint64) (*protocol.Membership
}

// Membership will ask for a Proof to the server.
func (c HTTPClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (*protocol.MembershipResult, error) {
func (c *HTTPClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (*protocol.MembershipResult, error) {

query, _ := json.Marshal(&protocol.MembershipDigest{
KeyDigest: keyDigest,
Expand All @@ -189,7 +215,7 @@ func (c HTTPClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (
}

// Incremental will ask for an IncrementalProof to the server.
func (c HTTPClient) Incremental(start, end uint64) (*protocol.IncrementalResponse, error) {
func (c *HTTPClient) Incremental(start, end uint64) (*protocol.IncrementalResponse, error) {

query, _ := json.Marshal(&protocol.IncrementalRequest{
Start: start,
Expand Down Expand Up @@ -218,10 +244,10 @@ func (c HTTPClient) Verify(
proof := protocol.ToBalloonProof(result, hasherF)

return proof.Verify(snap.EventDigest, &balloon.Snapshot{
snap.EventDigest,
snap.HistoryDigest,
snap.HyperDigest,
snap.Version,
EventDigest: snap.EventDigest,
HistoryDigest: snap.HistoryDigest,
HyperDigest: snap.HyperDigest,
Version: snap.Version,
})

}
Expand All @@ -237,10 +263,10 @@ func (c HTTPClient) DigestVerify(
proof := protocol.ToBalloonProof(result, hasherF)

return proof.DigestVerify(snap.EventDigest, &balloon.Snapshot{
snap.EventDigest,
snap.HistoryDigest,
snap.HyperDigest,
snap.Version,
EventDigest: snap.EventDigest,
HistoryDigest: snap.HistoryDigest,
HyperDigest: snap.HyperDigest,
Version: snap.Version,
})

}
Expand All @@ -254,16 +280,16 @@ func (c HTTPClient) VerifyIncremental(
proof := protocol.ToIncrementalProof(result, hasher)

start := &balloon.Snapshot{
startSnapshot.EventDigest,
startSnapshot.HistoryDigest,
startSnapshot.HyperDigest,
startSnapshot.Version,
EventDigest: startSnapshot.EventDigest,
HistoryDigest: startSnapshot.HistoryDigest,
HyperDigest: startSnapshot.HyperDigest,
Version: startSnapshot.Version,
}
end := &balloon.Snapshot{
endSnapshot.EventDigest,
endSnapshot.HistoryDigest,
endSnapshot.HyperDigest,
endSnapshot.Version,
HistoryDigest: endSnapshot.EventDigest,
EventDigest: endSnapshot.HistoryDigest,
HyperDigest: endSnapshot.HyperDigest,
Version: endSnapshot.Version,
}

return proof.Verify(start, end)
Expand Down
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func setup() func() {
mux = http.NewServeMux()
server = httptest.NewServer(mux)
client = NewHTTPClient(Config{
Endpoint: server.URL,
APIKey: "my-awesome-api-key",
Cluster: QEDCluster{Endpoints: []string{server.URL}, Leader: server.URL},
APIKey: "my-awesome-api-key",
Insecure: false,
})
return func() {
Expand Down
11 changes: 8 additions & 3 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package client

type Config struct {
// Server host:port to consult.
Endpoint string
// Cluster [host:port,host:port,...] to consult.
Cluster QEDCluster

// ApiKey to query the server endpoint.
APIKey string
Expand All @@ -36,9 +36,14 @@ type Config struct {
HandshakeTimeoutSeconds int
}

type QEDCluster struct {
Endpoints []string
Leader string
}

func DefaultConfig() *Config {
return &Config{
Endpoint: "localhost:8800",
Cluster: QEDCluster{[]string{"127.0.0.1:8800"}, ""},
APIKey: "my-key",
Insecure: true,
TimeoutSeconds: 10,
Expand Down
6 changes: 3 additions & 3 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func newClientCommand(ctx *cmdContext) *cobra.Command {
}

f := cmd.PersistentFlags()
f.StringVarP(&clientCtx.config.Endpoint, "endpoint", "e", "127.0.0.1:8800", "Endpoint for REST requests on (host:port)")
f.StringSliceVarP(&clientCtx.config.Endpoints, "endpoints", "e", []string{"127.0.0.1:8800"}, "Endpoint for REST requests on (host:port)")
f.BoolVar(&clientCtx.config.Insecure, "insecure", false, "Allow self signed certificates")
f.IntVar(&clientCtx.config.TimeoutSeconds, "timeout-seconds", 10, "Seconds to cut the connection")
f.IntVar(&clientCtx.config.DialTimeoutSeconds, "dial-timeout-seconds", 5, "Seconds to cut the dialing")
f.IntVar(&clientCtx.config.HandshakeTimeoutSeconds, "handshake-timeout-seconds", 5, "Seconds to cut the handshaking")

// Lookups
v.BindPFlag("client.endpoint", f.Lookup("endpoint"))
v.BindPFlag("client.endpoints", f.Lookup("endpoints"))
v.BindPFlag("client.insecure", f.Lookup("insecure"))
v.BindPFlag("client.timeout.connection", f.Lookup("timeout-seconds"))
v.BindPFlag("client.timeout.dial", f.Lookup("dial-timeout-seconds"))
Expand All @@ -52,7 +52,7 @@ func newClientCommand(ctx *cmdContext) *cobra.Command {
log.SetLogger("QEDClient", ctx.logLevel)

clientCtx.config.APIKey = ctx.apiKey
clientCtx.config.Endpoint = v.GetString("client.endpoint")
clientCtx.config.Endpoints = v.GetStringSlice("client.endpoints")
clientCtx.config.Insecure = v.GetBool("client.insecure")
clientCtx.config.TimeoutSeconds = v.GetInt("client.timeout.connection")
clientCtx.config.DialTimeoutSeconds = v.GetInt("client.timeout.dial")
Expand Down
4 changes: 4 additions & 0 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ type Auditor struct {
func NewAuditor(conf Config) (*Auditor, error) {
metrics.Qed_auditor_instances_count.Inc()
auditor := Auditor{
<<<<<<< HEAD
qed: client.NewHTTPClient(client.Config{
Endpoint: conf.QEDUrls[0],
APIKey: conf.APIKey,
Insecure: false,
}),
=======
qed: client.NewHttpClient(conf.QEDUrls, conf.APIKey),
>>>>>>> WIP
conf: conf,
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
Expand Down
2 changes: 1 addition & 1 deletion gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewMonitor(conf Config) (*Monitor, error) {

monitor := Monitor{
client: client.NewHTTPClient(client.Config{
Endpoint: conf.QEDUrls[0],
Cluster: conf.QEDUrls,
APIKey: conf.APIKey,
Insecure: false,
}),
Expand Down
1 change: 1 addition & 0 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type RaftBalloonApi interface {
QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error)
// Join joins the node, identified by nodeID and reachable at addr, to the cluster
Join(nodeID, addr string) error
LeaderAddr() string
}

// RaftBalloon is a replicated verifiable key-value store, where changes are made via Raft consensus.
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCli(t *testing.T) {

_, err := cmd.CombinedOutput()

assert.NoError(t, err, "Subprocess must not exit with status 1")
assert.NoError(t, err, "%v", *cmd) //"Subprocess must not exit with status 1")
})

let("verify event with eventDigest", func(t *testing.T) {
Expand Down

0 comments on commit ef23483

Please sign in to comment.