From aa430633b863402b92d77606ee2f941b564a024d Mon Sep 17 00:00:00 2001 From: iknite Date: Tue, 4 Dec 2018 15:05:29 +0100 Subject: [PATCH] Add query digest membership --- api/apihttp/apihttp.go | 57 ++++++++++++++++++++++++++++- api/apihttp/apihttp_test.go | 72 +++++++++++++++++++++++++++++++++---- balloon/balloon.go | 21 ++++++++--- client/client.go | 37 +++++++++++++++++-- client/client_test.go | 24 +++++++++++++ cmd/client_add.go | 2 +- cmd/client_membership.go | 33 +++++++++++------ gossip/auditor/auditor.go | 4 +-- protocol/protocol.go | 41 +++++++++++++++++---- raftwal/fsm.go | 4 +++ raftwal/raft.go | 5 +++ 11 files changed, 264 insertions(+), 36 deletions(-) diff --git a/api/apihttp/apihttp.go b/api/apihttp/apihttp.go index 9f0d1813f..5bcb779ef 100644 --- a/api/apihttp/apihttp.go +++ b/api/apihttp/apihttp.go @@ -109,7 +109,7 @@ func Add(balloon raftwal.RaftBalloonApi) http.HandlerFunc { response.HistoryDigest, response.HyperDigest, response.Version, - event.Event, + response.EventDigest, } out, err := json.Marshal(snapshot) @@ -176,6 +176,60 @@ func Membership(balloon raftwal.RaftBalloonApi) http.HandlerFunc { } } +// DigestMembership returns a membershipProof from the system +// The http post url is: +// POST /proofs/digest-membership +// +// Differs from Membership in that instead of sending the raw event we query +// with the keyDigest which is the digest of the event. +// +// The following statuses are expected: +// If everything is alright, the HTTP status is 201 and the body contains: +// { +// "key": "TG9yZW0gaXBzdW0gZGF0dW0gbm9uIGNvcnJ1cHR1bSBlc3QK", +// "keyDigest": "NDRkMmY3MjEzYjlhMTI4ZWRhZjQzNWFhNjcyMzUxMGE0YTRhOGY5OWEzOWNiYTVhN2FhMWI5OWEwYTlkYzE2NCAgLQo=", +// "isMember": "true", +// "proofs": [""], +// "queryVersion": "1", +// "actualVersion": "2", +// } +func DigestMembership(balloon raftwal.RaftBalloonApi) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + // Make sure we can only be called with an HTTP POST request. + if r.Method != "POST" { + w.Header().Set("Allow", "POST") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + var query protocol.MembershipDigest + err := json.NewDecoder(r.Body).Decode(&query) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Wait for the response + proof, err := balloon.QueryDigestMembership(query.KeyDigest, query.Version) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + out, err := json.Marshal(protocol.ToMembershipResult([]byte(nil), proof)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + w.Write(out) + return + + } +} + // Incremental returns an incrementalProof from the system // The http post url is: // POST /proofs/incremental @@ -250,6 +304,7 @@ func NewApiHttp(balloon raftwal.RaftBalloonApi) *http.ServeMux { api.HandleFunc("/health-check", AuthHandlerMiddleware(HealthCheckHandler)) api.HandleFunc("/events", AuthHandlerMiddleware(Add(balloon))) api.HandleFunc("/proofs/membership", AuthHandlerMiddleware(Membership(balloon))) + api.HandleFunc("/proofs/digest-membership", AuthHandlerMiddleware(DigestMembership(balloon))) api.HandleFunc("/proofs/incremental", AuthHandlerMiddleware(Incremental(balloon))) return api diff --git a/api/apihttp/apihttp_test.go b/api/apihttp/apihttp_test.go index a36f00e70..322a4f629 100644 --- a/api/apihttp/apihttp_test.go +++ b/api/apihttp/apihttp_test.go @@ -51,18 +51,21 @@ func (b fakeRaftBalloon) Join(nodeID, addr string) error { return nil } -func (b fakeRaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) { - mp := &balloon.MembershipProof{ +func (b fakeRaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) { + return &balloon.MembershipProof{ true, visitor.NewFakeVerifiable(true), visitor.NewFakeVerifiable(true), 1, 1, 2, - hashing.Digest{0x0}, + keyDigest, hashing.NewFakeXorHasher(), - } - return mp, nil + }, nil +} + +func (b fakeRaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) { + return b.QueryDigestMembership(event, version) } func (b fakeRaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) { @@ -163,7 +166,64 @@ func TestMembership(t *testing.T) { // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. rr := httptest.NewRecorder() handler := Membership(fakeRaftBalloon{}) - expectedResult := &protocol.MembershipResult{Exists: true, Hyper: visitor.AuditPath{}, History: visitor.AuditPath{}, CurrentVersion: 0x1, QueryVersion: 0x1, ActualVersion: 0x2, KeyDigest: []uint8{0x0}, Key: []uint8{0x74, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x20, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x65, 0x76, 0x65, 0x6e, 0x74}} + expectedResult := &protocol.MembershipResult{ + Exists: true, + Hyper: visitor.AuditPath{}, + History: visitor.AuditPath{}, + CurrentVersion: 0x1, + QueryVersion: 0x1, + ActualVersion: 0x2, + KeyDigest: []uint8{0x0}, + Key: key, + } + + // Our handlers satisfy http.Handler, so we can call their ServeHTTP method + // directly and pass in our Request and ResponseRecorder. + handler.ServeHTTP(rr, req) + + // Check the status code is what we expect. + if status := rr.Code; status != http.StatusOK { + t.Fatalf("handler returned wrong status code: got %v want %v", + status, http.StatusOK) + } + + // Check the body response + actualResult := new(protocol.MembershipResult) + json.Unmarshal([]byte(rr.Body.String()), actualResult) + + assert.Equal(t, expectedResult, actualResult, "Incorrect proof") + +} + +func TestDigestMembership(t *testing.T) { + + version := uint64(1) + hasher := hashing.NewSha256Hasher() + eventDigest := hasher.Do([]byte("this is a sample event")) + + query, _ := json.Marshal(protocol.MembershipDigest{ + eventDigest, + version, + }) + + req, err := http.NewRequest("POST", "/proofs/digest-membership", bytes.NewBuffer(query)) + if err != nil { + t.Fatal(err) + } + + // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. + rr := httptest.NewRecorder() + handler := DigestMembership(fakeRaftBalloon{}) + expectedResult := &protocol.MembershipResult{ + Exists: true, + Hyper: visitor.AuditPath{}, + History: visitor.AuditPath{}, + CurrentVersion: 0x1, + QueryVersion: 0x1, + ActualVersion: 0x2, + KeyDigest: eventDigest, + Key: []byte(nil), + } // Our handlers satisfy http.Handler, so we can call their ServeHTTP method // directly and pass in our Request and ResponseRecorder. diff --git a/balloon/balloon.go b/balloon/balloon.go index a8eefc2a3..2dd90cd59 100644 --- a/balloon/balloon.go +++ b/balloon/balloon.go @@ -97,6 +97,7 @@ func NewMembershipProof( currentVersion, queryVersion, actualVersion uint64, keyDigest hashing.Digest, Hasher hashing.Hasher) *MembershipProof { + return &MembershipProof{ exists, hyperProof, @@ -109,15 +110,14 @@ func NewMembershipProof( } } -// Verify verifies a proof and answer from QueryMembership. Returns true if the +// DigestVerify verifies a proof and answer from QueryMembership. Returns true if the // answer and proof are correct and consistent, otherwise false. // Run by a client on input that should be verified. -func (p MembershipProof) Verify(event []byte, snapshot *Snapshot) bool { +func (p MembershipProof) DigestVerify(digest hashing.Digest, snapshot *Snapshot) bool { if p.HyperProof == nil || p.HistoryProof == nil { return false } - digest := p.Hasher.Do(event) hyperCorrect := p.HyperProof.Verify(digest, snapshot.HyperDigest) if p.Exists { @@ -130,6 +130,13 @@ func (p MembershipProof) Verify(event []byte, snapshot *Snapshot) bool { return hyperCorrect } +// Verify verifies a proof and answer from QueryMembership. Returns true if the +// answer and proof are correct and consistent, otherwise false. +// Run by a client on input that should be verified. +func (p MembershipProof) Verify(event []byte, snapshot *Snapshot) bool { + return p.DigestVerify(p.Hasher.Do(event), snapshot) +} + type IncrementalProof struct { Start, End uint64 AuditPath visitor.AuditPath @@ -223,7 +230,7 @@ func (b *Balloon) Add(event []byte) (*Snapshot, []*storage.Mutation, error) { return snapshot, mutations, nil } -func (b Balloon) QueryMembership(event []byte, version uint64) (*MembershipProof, error) { +func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*MembershipProof, error) { stats := metrics.Balloon stats.AddFloat("QueryMembership", 1) var proof MembershipProof @@ -233,7 +240,7 @@ func (b Balloon) QueryMembership(event []byte, version uint64) (*MembershipProof var historyProof *history.MembershipProof proof.Hasher = b.hasherF() - proof.KeyDigest = proof.Hasher.Do(event) + proof.KeyDigest = keyDigest proof.QueryVersion = version proof.CurrentVersion = b.version - 1 @@ -272,6 +279,10 @@ func (b Balloon) QueryMembership(event []byte, version uint64) (*MembershipProof return &proof, nil } +func (b Balloon) QueryMembership(event []byte, version uint64) (*MembershipProof, error) { + return b.QueryDigestMembership(b.hasher.Do(event), version) +} + func (b Balloon) QueryConsistency(start, end uint64) (*IncrementalProof, error) { stats := metrics.Balloon stats.AddFloat("QueryConsistency", 1) diff --git a/client/client.go b/client/client.go index f2f198a00..48acb72cb 100644 --- a/client/client.go +++ b/client/client.go @@ -138,6 +138,26 @@ func (c HttpClient) Membership(key []byte, version uint64) (*protocol.Membership } +// Membership will ask for a Proof to the server. +func (c HttpClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (*protocol.MembershipResult, error) { + + query, _ := json.Marshal(&protocol.MembershipDigest{ + keyDigest, + version, + }) + + body, err := c.doReq("POST", "/proofs/digest-membership", query) + if err != nil { + return nil, err + } + + var proof *protocol.MembershipResult + json.Unmarshal(body, &proof) + + return proof, nil + +} + // Incremental will ask for an IncrementalProof to the server. func (c HttpClient) Incremental(start, end uint64) (*protocol.IncrementalResponse, error) { @@ -165,10 +185,9 @@ func uint2bytes(i uint64) []byte { // Verify will compute the Proof given in Membership and the snapshot from the // add and returns a proof of existence. - func (c HttpClient) Verify(result *protocol.MembershipResult, snap *protocol.Snapshot, hasherF func() hashing.Hasher) bool { - proof := protocol.ToBalloonProof([]byte(c.apiKey), result, hasherF) + proof := protocol.ToBalloonProof(result, hasherF) return proof.Verify(snap.EventDigest, &balloon.Snapshot{ snap.EventDigest, @@ -179,6 +198,20 @@ func (c HttpClient) Verify(result *protocol.MembershipResult, snap *protocol.Sna } +// Verify will compute the Proof given in Membership and the snapshot from the +// add and returns a proof of existence. +func (c HttpClient) DigestVerify(result *protocol.MembershipResult, snap *protocol.Snapshot, hasherF func() hashing.Hasher) bool { + + proof := protocol.ToBalloonProof(result, hasherF) + + return proof.DigestVerify(snap.EventDigest, &balloon.Snapshot{ + snap.EventDigest, + snap.HistoryDigest, + snap.HyperDigest, + snap.Version, + }) + +} func (c HttpClient) VerifyIncremental(result *protocol.IncrementalResponse, startSnapshot, endSnapshot *protocol.Snapshot, hasher hashing.Hasher) bool { proof := protocol.ToIncrementalProof(result, hasher) diff --git a/client/client_test.go b/client/client_test.go index 5e1d0ba11..5736315a3 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -106,6 +106,30 @@ func TestMembership(t *testing.T) { } +func TestDigestMembership(t *testing.T) { + tearDown := setup() + defer tearDown() + + event := "Hello world!" + version := uint64(0) + fakeResult := &protocol.MembershipResult{ + Key: []byte(event), + KeyDigest: []byte("digest"), + Exists: true, + Hyper: make(visitor.AuditPath), + History: make(visitor.AuditPath), + CurrentVersion: version, + QueryVersion: version, + ActualVersion: version, + } + resultJSON, _ := json.Marshal(fakeResult) + mux.HandleFunc("/proofs/digest-membership", okHandler(resultJSON)) + + result, err := client.MembershipDigest([]byte("digest"), version) + assert.NoError(t, err) + assert.Equal(t, fakeResult, result, "The results should match") +} + func TestMembershipWithServerFailure(t *testing.T) { tearDown := setup() defer tearDown() diff --git a/cmd/client_add.go b/cmd/client_add.go index ef84292d9..64cb9068e 100644 --- a/cmd/client_add.go +++ b/cmd/client_add.go @@ -40,7 +40,7 @@ func newAddCommand(ctx *clientContext) *cobra.Command { log.Infof(` Received snapshot with values: - EventDigest: %s + EventDigest: %x HyperDigest: %x HistoryDigest: %x Version: %d diff --git a/cmd/client_membership.go b/cmd/client_membership.go index 2665d30d9..56e6f7cff 100644 --- a/cmd/client_membership.go +++ b/cmd/client_membership.go @@ -29,9 +29,10 @@ import ( func newMembershipCommand(ctx *clientContext) *cobra.Command { + hasherF := hashing.NewSha256Hasher var version uint64 var verify bool - var key, hyperDigest, historyDigest string + var key, eventDigest, hyperDigest, historyDigest string cmd := &cobra.Command{ Use: "membership", @@ -50,28 +51,38 @@ func newMembershipCommand(ctx *clientContext) *cobra.Command { return nil }, RunE: func(cmd *cobra.Command, args []string) error { - log.Infof("Querying key [ %s ] with version [ %d ]\n", key, version) + var membershipResult *protocol.MembershipResult + var digest hashing.Digest + var err error - event := []byte(key) - proof, err := ctx.client.Membership(event, version) + if eventDigest == "" { + log.Infof("Querying key [ %s ] with version [ %d ]\n", key, version) + digest = hasherF().Do([]byte(key)) + + } else { + log.Infof("Querying digest [ %s ] with version [ %d ]\n", eventDigest, version) + digest, _ = hex.DecodeString(eventDigest) + } + + membershipResult, err = ctx.client.MembershipDigest(digest, version) if err != nil { return err } - log.Infof("Received proof: %+v\n", proof) - if verify { hdBytes, _ := hex.DecodeString(hyperDigest) htdBytes, _ := hex.DecodeString(historyDigest) - snapshot := &protocol.Snapshot{htdBytes, hdBytes, version, event} + snapshot := &protocol.Snapshot{htdBytes, hdBytes, version, digest} + + log.Infof("Verifying with Snapshot: \n\tEventDigest:%x\n\tHyperDigest: %s\n\tHistoryDigest: %s\n\tVersion: %d\n", + digest, hyperDigest, historyDigest, version) - log.Infof("Verifying with Snapshot: \n\tEventDigest:%s\n\tHyperDigest: %s\n\tHistoryDigest: %s\n\tVersion: %d\n", - event, hyperDigest, historyDigest, version) - if ctx.client.Verify(proof, snapshot, hashing.NewSha256Hasher) { + if ctx.client.Verify(membershipResult, snapshot, hasherF) { log.Info("Verify: OK") } else { log.Info("Verify: KO") } + } return nil @@ -81,10 +92,10 @@ func newMembershipCommand(ctx *clientContext) *cobra.Command { cmd.Flags().StringVar(&key, "key", "", "Key to query") cmd.Flags().Uint64Var(&version, "version", 0, "Version to query") cmd.Flags().BoolVar(&verify, "verify", false, "Do verify received proof") + cmd.Flags().StringVar(&eventDigest, "eventDigest", "", "Digest of the event") cmd.Flags().StringVar(&hyperDigest, "hyperDigest", "", "Digest of the hyper tree") cmd.Flags().StringVar(&historyDigest, "historyDigest", "", "Digest of the history tree") - cmd.MarkFlagRequired("key") cmd.MarkFlagRequired("version") return cmd diff --git a/gossip/auditor/auditor.go b/gossip/auditor/auditor.go index eaea3f408..512f71d92 100644 --- a/gossip/auditor/auditor.go +++ b/gossip/auditor/auditor.go @@ -17,10 +17,7 @@ package auditor import ( - "encoding/base64" "fmt" - "io/ioutil" - "net/http" "time" "github.com/bbva/qed/client" @@ -100,6 +97,7 @@ type MembershipTask struct { func (t *MembershipTask) Do() { proof, err := t.qed.Membership(t.s.Snapshot.EventDigest, t.s.Snapshot.Version) if err != nil { + // retry log.Errorf("Error executing incremental query: %v", err) } diff --git a/protocol/protocol.go b/protocol/protocol.go index 17a3e2a0c..b374af9c6 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -44,6 +44,13 @@ type MembershipQuery struct { Version uint64 } +// MembershipDigest is the public struct that apihttp.DigestMembership +// Handler uses to parse the post params. +type MembershipDigest struct { + KeyDigest hashing.Digest + Version uint64 +} + // Snapshot is the public struct that apihttp.Add Handler call returns. type Snapshot struct { HistoryDigest hashing.Digest @@ -166,14 +173,34 @@ func ToMembershipResult(key []byte, mp *balloon.MembershipProof) *MembershipResu } } -// ToBaloonProof translate public protocol.MembershipResult:w to internal +// ToBaloonProof translate public protocol.MembershipResult to internal // balloon.Proof. -func ToBalloonProof(id []byte, mr *MembershipResult, hasherF func() hashing.Hasher) *balloon.MembershipProof { - - historyProof := history.NewMembershipProof(mr.ActualVersion, mr.QueryVersion, mr.History, hasherF()) - hyperProof := hyper.NewQueryProof(mr.KeyDigest, util.Uint64AsBytes(mr.ActualVersion), mr.Hyper, hasherF()) - - return balloon.NewMembershipProof(mr.Exists, hyperProof, historyProof, mr.CurrentVersion, mr.ActualVersion, mr.QueryVersion, mr.KeyDigest, hasherF()) +func ToBalloonProof(mr *MembershipResult, hasherF func() hashing.Hasher) *balloon.MembershipProof { + + historyProof := history.NewMembershipProof( + mr.ActualVersion, + mr.QueryVersion, + mr.History, + hasherF(), + ) + + hyperProof := hyper.NewQueryProof( + mr.KeyDigest, + util.Uint64AsBytes(mr.ActualVersion), + mr.Hyper, + hasherF(), + ) + + return balloon.NewMembershipProof( + mr.Exists, + hyperProof, + historyProof, + mr.CurrentVersion, + mr.ActualVersion, + mr.QueryVersion, + mr.KeyDigest, + hasherF(), + ) } diff --git a/raftwal/fsm.go b/raftwal/fsm.go index f725c7557..046ca7e3d 100644 --- a/raftwal/fsm.go +++ b/raftwal/fsm.go @@ -90,6 +90,10 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, ag }, nil } +func (fsm *BalloonFSM) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) { + return fsm.balloon.QueryDigestMembership(keyDigest, version) +} + func (fsm *BalloonFSM) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) { return fsm.balloon.QueryMembership(event, version) } diff --git a/raftwal/raft.go b/raftwal/raft.go index fb9252b63..8d47913d9 100644 --- a/raftwal/raft.go +++ b/raftwal/raft.go @@ -52,6 +52,7 @@ var ( // RaftBalloon is the interface Raft-backed balloons must implement. type RaftBalloonApi interface { Add(event []byte) (*balloon.Snapshot, error) + QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) // Join joins the node, identified by nodeID and reachable at addr, to the cluster @@ -404,6 +405,10 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) { return resp.(*fsmAddResponse).snapshot, nil } +func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) { + return b.fsm.QueryDigestMembership(keyDigest, version) +} + func (b *RaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) { return b.fsm.QueryMembership(event, version) }