diff --git a/gossip/auditor/auditor.go.txt b/gossip/auditor/auditor.go.txt deleted file mode 100644 index 18697c149..000000000 --- a/gossip/auditor/auditor.go.txt +++ /dev/null @@ -1,120 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package auditor - -import ( - "context" - "fmt" - - "github.com/bbva/qed/gossip" - "github.com/bbva/qed/hashing" - "github.com/bbva/qed/log" - "github.com/bbva/qed/protocol" - - "github.com/prometheus/client_golang/prometheus" -) - -var ( - QedAuditorInstancesCount = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "qed_auditor_instances_count", - Help: "Number of auditor agents running.", - }, - ) - - QedAuditorBatchesProcessSeconds = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "qed_auditor_batches_process_seconds", - Help: "Duration of Auditor batch processing", - }, - ) - - QedAuditorBatchesReceivedTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "qed_auditor_batches_received_total", - Help: "Number of batches received by auditors.", - }, - ) - - QedAuditorGetMembershipProofErrTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "qed_auditor_get_membership_proof_err_total", - Help: "Number of errors trying to get membership proofs by auditors.", - }, - ) -) - -type Auditor struct{} - -func (a Auditor) Metrics() []prometheus.Collector { - return []prometheus.Collector{ - QedAuditorInstancesCount, - QedAuditorBatchesProcessSeconds, - QedAuditorBatchesReceivedTotal, - QedAuditorGetMembershipProofErrTotal, - } -} - -func (a Auditor) Process(agent *gossip.Agent, ctx context.Context) error { - QedAuditorBatchesReceivedTotal.Inc() - - qed := agent.Qed() - store := agent.SnapshotStore() - b := ctx.Value("batch").(*protocol.BatchSnapshots) - s := b.Snapshots[0] - - return agent.Task(func() error { - timer := prometheus.NewTimer(QedAuditorBatchesProcessSeconds) - defer timer.ObserveDuration() - - proof, err := qed.MembershipDigest(s.Snapshot.EventDigest, s.Snapshot.Version) - if err != nil { - log.Infof("Auditor is unable to get membership proof from QED server: %v", err) - - switch fmt.Sprintf("%T", err) { - case "*errors.errorString": - agent.Alert(fmt.Sprintf("Auditor is unable to get membership proof from QED server: %v", err)) - default: - QedAuditorGetMembershipProofErrTotal.Inc() - } - - return err - } - - storedSnap, err := store.GetSnapshot(proof.CurrentVersion) - if err != nil { - log.Infof("Unable to get snapshot from storage: %v", err) - return err - } - - checkSnap := &protocol.Snapshot{ - HistoryDigest: s.Snapshot.HistoryDigest, - HyperDigest: storedSnap.Snapshot.HyperDigest, - Version: s.Snapshot.Version, - EventDigest: s.Snapshot.EventDigest, - } - - ok := qed.DigestVerify(proof, checkSnap, hashing.NewSha256Hasher) - if !ok { - agent.Alert(fmt.Sprintf("Unable to verify snapshot %v", s.Snapshot)) - log.Infof("Unable to verify snapshot %v", s.Snapshot) - } - - log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", s.Snapshot) - return nil - }) -} diff --git a/gossip/monitor/monitor.go.txt b/gossip/monitor/monitor.go.txt deleted file mode 100644 index 032883f56..000000000 --- a/gossip/monitor/monitor.go.txt +++ /dev/null @@ -1,101 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package monitor - -import ( - "context" - "fmt" - - "github.com/bbva/qed/gossip" - "github.com/bbva/qed/hashing" - "github.com/bbva/qed/log" - "github.com/bbva/qed/protocol" - - "github.com/prometheus/client_golang/prometheus" -) - -var ( - QedMonitorInstancesCount = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "qed_monitor_instances_count", - Help: "Number of monitor agents running.", - }, - ) - - QedMonitorBatchesReceivedTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "qed_monitor_batches_received_total", - Help: "Number of batches received by monitors.", - }, - ) - - QedMonitorBatchesProcessSeconds = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "qed_monitor_batches_process_seconds", - Help: "Duration of Monitor batch processing", - }, - ) - - QedMonitorGetIncrementalProofErrTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "qed_monitor_get_incremental_proof_err_total", - Help: "Number of errors trying to get incremental proofs by monitors.", - }, - ) -) - -type Monitor struct{} - -func (m Monitor) Metrics() []prometheus.Collector { - return []prometheus.Collector{ - QedMonitorInstancesCount, - QedMonitorBatchesReceivedTotal, - QedMonitorBatchesProcessSeconds, - QedMonitorGetIncrementalProofErrTotal, - } -} - -// b *protocol.BatchSnapshots) -func (m Monitor) Process(agent *gossip.Agent, ctx context.Context) error { - QedMonitorBatchesReceivedTotal.Inc() - - qed := agent.Qed() - b := ctx.Value("batch").(*protocol.BatchSnapshots) - - return agent.Task(func() error { - timer := prometheus.NewTimer(QedMonitorBatchesProcessSeconds) - defer timer.ObserveDuration() - - first := b.Snapshots[0].Snapshot - last := b.Snapshots[len(b.Snapshots)-1].Snapshot - - resp, err := qed.Incremental(first.Version, last.Version) - if err != nil { - QedMonitorGetIncrementalProofErrTotal.Inc() - log.Infof("Monitor is unable to get incremental proof from QED server: %s", err.Error()) - return err - } - ok := qed.VerifyIncremental(resp, first, last, hashing.NewSha256Hasher()) - if !ok { - agent.Alert(fmt.Sprintf("Monitor is unable to verify incremental proof from %d to %d", first.Version, last.Version)) - log.Infof("Monitor is unable to verify incremental proof from %d to %d", first.Version, last.Version) - } - log.Debugf("Monitor verified a consistency proof between versions %d and %d: %v\n", first.Version, last.Version, ok) - return nil - }) -} - diff --git a/gossip/publisher/publisher.go.txt b/gossip/publisher/publisher.go.txt deleted file mode 100644 index 8952d1079..000000000 --- a/gossip/publisher/publisher.go.txt +++ /dev/null @@ -1,90 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package publisher - -import ( - "context" - - "github.com/bbva/qed/gossip" - "github.com/bbva/qed/protocol" - - "github.com/prometheus/client_golang/prometheus" -) - -var ( - QedPublisherInstancesCount = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "qed_publisher_instances_count", - Help: "Number of publisher agents running.", - }, - ) - - QedPublisherBatchesReceivedTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "qed_publisher_batches_received_total", - Help: "Number of batches received by publishers.", - }, - ) - - QedPublisherBatchesProcessSeconds = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "qed_publisher_batches_process_seconds", - Help: "Duration of Publisher batch processing", - }, - ) -) - -type Publisher struct { -} - -func (p Publisher) Metrics() []prometheus.Collector { - return []prometheus.Collector{ - QedPublisherInstancesCount, - QedPublisherBatchesReceivedTotal, - QedPublisherBatchesProcessSeconds, - } -} - -func (p Publisher) New(ctx context.Context) Task { - QedPublisherBatchesReceivedTotal.Inc() - - a := ctx.Value("agent").(gossip.Agent) - b := ctx.Value("batch").(*protocol.BatchSnapshots) - - return func() error { - timer := prometheus.NewTimer(QedPublisherBatchesProcessSeconds) - defer timer.ObserveDuration() - - var batch protocol.BatchSnapshots - - for _, signedSnap := range b.Snapshots { - _, err := a.Cache.Get(signedSnap.Signature) - if err != nil { - a.Cache.Set(signedSnap.Signature, []byte{0x0}, 0) - batch.Snapshots = append(batch.Snapshots, signedSnap) - } - } - if len(batch.Snapshots) < 1 { - return nil - } - - batch.From = b.From - batch.TTL = b.TTL - - return a.SnapshotStore.PutBatch(&batch) - } -} diff --git a/gossip/sender/sender.go.txt b/gossip/sender/sender.go.txt deleted file mode 100644 index 33a6512c9..000000000 --- a/gossip/sender/sender.go.txt +++ /dev/null @@ -1,157 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package sender - -import ( - "fmt" - "time" - - "github.com/bbva/qed/gossip" - "github.com/bbva/qed/log" - "github.com/bbva/qed/metrics" - "github.com/bbva/qed/protocol" - "github.com/bbva/qed/sign" - "github.com/prometheus/client_golang/prometheus" -) - -var ( - // SENDER - - QedSenderInstancesCount = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "qed_sender_instances_count", - Help: "Number of sender agents running", - }, - ) - QedSenderBatchesSentTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "qed_sender_batches_sent_total", - Help: "Number of batches sent by the sender.", - }, - ) -) - -type Sender struct { - agent *gossip.Agent - signer sign.Signer - config *Config - outCh chan *protocol.BatchSnapshots - quitCh chan bool -} - -type Config struct { - BatchSize int - BatchInterval time.Duration - NumSenders int - TTL int - EachN int - SendTimer time.Duration -} - -func DefaultConfig() *Config { - return &Config{ - BatchSize: 100, - BatchInterval: 1 * time.Second, - NumSenders: 3, - TTL: 1, - EachN: 1, - SendTimer: 1000 * time.Millisecond, - } -} - -func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender { - QedSenderInstancesCount.Inc() - return &Sender{ - agent: a, - signer: s, - outCh: make(chan *protocol.BatchSnapshots, 1<<16), - quitCh: make(chan bool), - } -} - -// Start NumSenders concurrent senders and waits for them -// to finish -func (s Sender) Start(ch chan *protocol.Snapshot) { - for i := 0; i < s.config.NumSenders; i++ { - log.Debugf("starting sender %d", i) - go s.batcher(i, ch) - } -} - -func (s Sender) RegisterMetrics(srv *metrics.Server) { - metrics := []prometheus.Collector{ - QedSenderInstancesCount, - QedSenderBatchesSentTotal, - } - srv.MustRegister(metrics...) -} - -func (s Sender) newBatch() *protocol.BatchSnapshots { - return &protocol.BatchSnapshots{ - TTL: s.config.TTL, - From: s.agent.Self(), - Snapshots: make([]*protocol.SignedSnapshot, 0), - } -} - -// Sign snapshots, build batches of signed snapshots and send those batches -// to other members of the gossip network. -// If the out queue is full, we drop the current batch and pray other sender will -// send the batches to the gossip network. -func (s Sender) batcher(id int, ch chan *protocol.Snapshot) { - batch := s.newBatch() - - for { - select { - case snap := <-ch: - if len(batch.Snapshots) == s.config.BatchSize { - s.agent.Out(batch) - batch = s.newBatch() - } - ss, err := s.doSign(snap) - if err != nil { - log.Errorf("Failed signing message: %v", err) - } - batch.Snapshots = append(batch.Snapshots, ss) - case b := <-s.outCh: - s.agent.Send(b) - case <-time.After(s.config.SendTimer): - // send whatever we have on each tick, do not wait - // to have complete batches - if len(batch.Snapshots) > 0 { - s.agent.Out(batch) - batch = s.newBatch() - } - case <-s.quitCh: - return - } - } -} - -func (s Sender) Stop() { - QedSenderInstancesCount.Dec() - close(s.quitCh) -} - -func (s *Sender) doSign(snapshot *protocol.Snapshot) (*protocol.SignedSnapshot, error) { - signature, err := s.signer.Sign([]byte(fmt.Sprintf("%v", snapshot))) - if err != nil { - log.Info("Publisher: error signing snapshot") - return nil, err - } - return &protocol.SignedSnapshot{Snapshot: snapshot, Signature: signature}, nil -}