Skip to content

Commit

Permalink
Include sender into the server package.
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Apr 10, 2019
1 parent 58844f9 commit 8729a04
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
4 changes: 4 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
)

type Config struct {
//Log level
Log string

// Unique identifier to allow connections
APIKey string

Expand Down Expand Up @@ -77,6 +80,7 @@ func DefaultConfig() *Config {
currentDir := getCurrentDir()

return &Config{
Log: "info",
APIKey: "",
NodeID: hostname,
HTTPAddr: "127.0.0.1:8800",
Expand Down
156 changes: 156 additions & 0 deletions server/sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
"fmt"
"time"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/sign"
"github.com/prometheus/client_golang/prometheus"
)

var (
// SENDER

QedSenderInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_sender_instances_count",
Help: "Number of sender agents running",
},
)
QedSenderBatchesSentTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_sender_batches_sent_total",
Help: "Number of batches sent by the sender.",
},
)
)

type Sender struct {
agent *gossip.Agent
Interval time.Duration
BatchSize int
NumSenders int
TTL int
signer sign.Signer
quitCh chan bool
}

func NewSender(a *gossip.Agent, s sign.Signer, size, ttl, n int) *Sender {
QedSenderInstancesCount.Inc()
return &Sender{
agent: a,
Interval: 100 * time.Millisecond,
BatchSize: size,
NumSenders: n,
TTL: ttl,
signer: s,
quitCh: make(chan bool),
}
}

// Start NumSenders concurrent senders and waits for them
// to finish
func (s Sender) Start(ch chan *protocol.Snapshot) {
for i := 0; i < s.NumSenders; i++ {
log.Debugf("Starting sender %d", i)
go s.batcher(i, ch)
}
}

func (s Sender) RegisterMetrics(srv *metrics.Server) {
metrics := []prometheus.Collector{
QedSenderInstancesCount,
QedSenderBatchesSentTotal,
}
srv.MustRegister(metrics...)
}

func (s Sender) newBatch() *protocol.BatchSnapshots {
return &protocol.BatchSnapshots{
Snapshots: make([]*protocol.SignedSnapshot, 0),
}
}

// Sign snapshots, build batches of signed snapshots and send those batches
// to other members of the gossip network.
// If the out queue is full, we drop the current batch and pray other sender will
// send the batches to the gossip network.
func (s Sender) batcher(id int, ch chan *protocol.Snapshot) {
batch := s.newBatch()

for {
select {
case snap := <-ch:
if len(batch.Snapshots) == s.BatchSize {
payload, err := batch.Encode()
if err != nil {
log.Infof("Error encoding batch, dropping it")
continue
}
s.agent.Out.Publish(&gossip.Message{
Kind: gossip.BatchMessageType,
TTL: s.TTL,
Payload: payload,
})
batch = s.newBatch()
}
ss, err := s.doSign(snap)
if err != nil {
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)
case <-time.After(s.Interval):
// send whatever we have on each tick, do not wait
// to have complete batches
if len(batch.Snapshots) > 0 {
payload, err := batch.Encode()
if err != nil {
log.Infof("Error encoding batch, dropping it")
continue
}
s.agent.Out.Publish(&gossip.Message{
Kind: gossip.BatchMessageType,
TTL: s.TTL,
Payload: payload,
})
batch = s.newBatch()
}
case <-s.quitCh:
return
}
}
}

func (s Sender) Stop() {
QedSenderInstancesCount.Dec()
close(s.quitCh)
}

func (s *Sender) doSign(snapshot *protocol.Snapshot) (*protocol.SignedSnapshot, error) {
signature, err := s.signer.Sign([]byte(fmt.Sprintf("%v", snapshot)))
if err != nil {
log.Info("Publisher: error signing snapshot")
return nil, err
}
return &protocol.SignedSnapshot{Snapshot: snapshot, Signature: signature}, nil
}

0 comments on commit 8729a04

Please sign in to comment.