Skip to content

Commit

Permalink
Redesign gossip package to refine the API
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Apr 10, 2019
1 parent eb4da9b commit 8a610d9
Show file tree
Hide file tree
Showing 29 changed files with 2,028 additions and 856 deletions.
471 changes: 189 additions & 282 deletions gossip/agent.go

Large diffs are not rendered by default.

99 changes: 25 additions & 74 deletions gossip/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,37 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/metrics"
)

func TestJoin(t *testing.T) {
conf := DefaultConfig()
conf.NodeName = "testNode"
conf.Role = member.Auditor
conf.Role = "auditor"
conf.BindAddr = "127.0.0.1:12345"
metricsServer := metrics.NewServer("127.0.0.2:23464")
a, _ := NewAgent(conf, []Processor{FakeProcessor{}}, metricsServer)

a, _ := NewAgentFromConfig(conf)
a.Start()

testCases := []struct {
agentState member.Status
agentState Status
addrs []string
expectedContactedHosts int
expectedErr error
}{
{
member.Alive,
AgentStatusAlive,
[]string{},
0,
nil,
},
{
member.Failed,
AgentStatusFailed,
[]string{},
0,
fmt.Errorf("Agent can't join after Leave or Shutdown"),
},
{
member.Alive,
AgentStatusAlive,
[]string{"127.0.0.1:12345"},
1,
nil,
Expand All @@ -66,45 +64,46 @@ func TestJoin(t *testing.T) {
require.Equal(t, c.expectedContactedHosts, result, "Wrong expected contacted hosts in test %d.", i)
require.Equal(t, c.expectedErr, err, "Wrong expected error in test %d.", i)
}
a.Shutdown()
}

func TestLeave(t *testing.T) {
conf := DefaultConfig()
conf.NodeName = "testNode"
conf.Role = member.Auditor
conf.Role = "auditor"
conf.BindAddr = "127.0.0.1:12346"
metricsServer := metrics.NewServer("127.0.0.2:13445")
a, _ := NewAgent(conf, []Processor{FakeProcessor{}}, metricsServer)

a, _ := NewAgentFromConfig(conf)
a.Start()
testCases := []struct {
agentState member.Status
agentState Status
expectedErr error
finalStatus member.Status
finalStatus Status
}{
{
member.Left,
AgentStatusLeft,
nil,
member.Left,
AgentStatusLeft,
},
{
member.Leaving,
AgentStatusLeaving,
fmt.Errorf("Leave already in progress"),
member.Leaving,
AgentStatusLeaving,
},
{
member.Shutdown,
AgentStatusShutdown,
fmt.Errorf("Leave called after Shutdown"),
member.Shutdown,
AgentStatusShutdown,
},
{
member.Alive,
AgentStatusAlive,
nil,
member.Left,
AgentStatusLeft,
},
{
member.Failed,
AgentStatusFailed,
nil,
member.Left,
AgentStatusLeft,
},
}

Expand All @@ -114,53 +113,5 @@ func TestLeave(t *testing.T) {
require.Equal(t, c.expectedErr, err, "Wrong expected error in test %d.", i)
require.Equal(t, c.finalStatus, a.Self.Status, "Wrong expected status in test %d.", i)
}
}

func TestShutdown(t *testing.T) {

conf := DefaultConfig()
conf.NodeName = "testNode"
conf.Role = member.Auditor
conf.BindAddr = "127.0.0.1:12347"
metricsServer := metrics.NewServer("127.0.0.2:43512")
a, _ := NewAgent(conf, []Processor{FakeProcessor{}}, metricsServer)

testCases := []struct {
agentState member.Status
expectedErr error
finalStatus member.Status
}{
{
member.Shutdown,
nil,
member.Shutdown,
},
{
member.Left,
nil,
member.Shutdown,
},
{
member.Alive,
nil,
member.Shutdown,
},
{
member.Failed,
nil,
member.Shutdown,
},
{
member.Leaving,
nil,
member.Shutdown,
},
}

for i, c := range testCases {
a.Self.Status = c.agentState
err := a.Shutdown()
require.Equal(t, c.expectedErr, err, "Wrong expected error in test %d.", i)
require.Equal(t, c.finalStatus, a.Self.Status, "Wrong expected status in test %d.", i)
}
a.Shutdown()
}
120 changes: 120 additions & 0 deletions gossip/auditor/auditor.go.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package auditor

import (
"context"
"fmt"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"

"github.com/prometheus/client_golang/prometheus"
)

var (
QedAuditorInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_auditor_instances_count",
Help: "Number of auditor agents running.",
},
)

QedAuditorBatchesProcessSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "qed_auditor_batches_process_seconds",
Help: "Duration of Auditor batch processing",
},
)

QedAuditorBatchesReceivedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_auditor_batches_received_total",
Help: "Number of batches received by auditors.",
},
)

QedAuditorGetMembershipProofErrTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_auditor_get_membership_proof_err_total",
Help: "Number of errors trying to get membership proofs by auditors.",
},
)
)

type Auditor struct{}

func (a Auditor) Metrics() []prometheus.Collector {
return []prometheus.Collector{
QedAuditorInstancesCount,
QedAuditorBatchesProcessSeconds,
QedAuditorBatchesReceivedTotal,
QedAuditorGetMembershipProofErrTotal,
}
}

func (a Auditor) Process(agent *gossip.Agent, ctx context.Context) error {
QedAuditorBatchesReceivedTotal.Inc()

qed := agent.Qed()
store := agent.SnapshotStore()
b := ctx.Value("batch").(*protocol.BatchSnapshots)
s := b.Snapshots[0]

return agent.Task(func() error {
timer := prometheus.NewTimer(QedAuditorBatchesProcessSeconds)
defer timer.ObserveDuration()

proof, err := qed.MembershipDigest(s.Snapshot.EventDigest, s.Snapshot.Version)
if err != nil {
log.Infof("Auditor is unable to get membership proof from QED server: %v", err)

switch fmt.Sprintf("%T", err) {
case "*errors.errorString":
agent.Alert(fmt.Sprintf("Auditor is unable to get membership proof from QED server: %v", err))
default:
QedAuditorGetMembershipProofErrTotal.Inc()
}

return err
}

storedSnap, err := store.GetSnapshot(proof.CurrentVersion)
if err != nil {
log.Infof("Unable to get snapshot from storage: %v", err)
return err
}

checkSnap := &protocol.Snapshot{
HistoryDigest: s.Snapshot.HistoryDigest,
HyperDigest: storedSnap.Snapshot.HyperDigest,
Version: s.Snapshot.Version,
EventDigest: s.Snapshot.EventDigest,
}

ok := qed.DigestVerify(proof, checkSnap, hashing.NewSha256Hasher)
if !ok {
agent.Alert(fmt.Sprintf("Unable to verify snapshot %v", s.Snapshot))
log.Infof("Unable to verify snapshot %v", s.Snapshot)
}

log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", s.Snapshot)
return nil
})
}
Loading

0 comments on commit 8a610d9

Please sign in to comment.