Skip to content

Commit

Permalink
Add publisher agent
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Apr 10, 2019
1 parent 2a256fd commit d6a8b68
Showing 1 changed file with 147 additions and 7 deletions.
154 changes: 147 additions & 7 deletions cmd/agent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,159 @@ package cmd

import (
"context"
"fmt"
"time"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/util"
"github.com/octago/sflags/gen/gpflag"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
)

func newAgentPublisherCommand(ctx context.Context, args []string) *cobra.Command {
var (
QedPublisherInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_publisher_instances_count",
Help: "Number of publisher agents running.",
},
)

QedPublisherBatchesReceivedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_publisher_batches_received_total",
Help: "Number of batches received by publishers.",
},
)

QedPublisherBatchesProcessSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "qed_publisher_batches_process_seconds",
Help: "Duration of Publisher batch processing",
},
)
)

var agentPublisherCmd *cobra.Command = &cobra.Command{
Use: "publisher",
Short: "Provides access to the QED gossip publisher agent",
Long: `Start a QED publisher which process gossip messages sending batch
messages contents to the snapshot storage.`,
RunE: runAgentPublisher,
}

var agentPublisherCtx context.Context

func init() {
agentPublisherCtx = configPublisher()
agentPublisherCmd.MarkFlagRequired("notifier-servers")
agentPublisherCmd.MarkFlagRequired("store-servers")
agentCmd.AddCommand(agentPublisherCmd)
}

type publisherConfig struct {
Notifier *gossip.DefaultNotifierConfig
Store *gossip.RestSnapshotStoreConfig
Tasks *gossip.DefaultTasksManagerConfig
}

cmd := &cobra.Command{
Use: "publisher",
Short: "Start a QED publisher",
Long: `Start a QED publisher that reacts to snapshot batches propagated by QED servers and periodically publishes them to a certain log storage.`,
Run: func(cmd *cobra.Command, args []string) {
func newPublisherConfig() *publisherConfig {
return &publisherConfig{
Notifier: &gossip.DefaultNotifierConfig{
DialTimeout: 200 * time.Millisecond,
QueueTimeout: 100 * time.Millisecond,
ReadTimeout: 200 * time.Millisecond,
},
Store: &gossip.RestSnapshotStoreConfig{
DialTimeout: 200 * time.Millisecond,
QueueTimeout: 100 * time.Millisecond,
ReadTimeout: 200 * time.Millisecond,
},
Tasks: &gossip.DefaultTasksManagerConfig{
QueueTimeout: 100 * time.Millisecond,
Interval: 200 * time.Millisecond,
MaxTasks: 10,
},
}
}

func configPublisher() context.Context {
conf := newPublisherConfig()
err := gpflag.ParseTo(conf, agentPublisherCmd.PersistentFlags())
if err != nil {
log.Fatalf("err: %v", err)
}

ctx := context.WithValue(agentCtx, k("publisher.config"), conf)

return ctx
}

func runAgentPublisher(cmd *cobra.Command, args []string) error {
agentConfig := agentCtx.Value(k("agent.config")).(*gossip.Config)
conf := agentPublisherCtx.Value(k("publisher.config")).(*publisherConfig)

log.SetLogger("publisher", agentConfig.Log)

notifier := gossip.NewDefaultNotifierFromConfig(conf.Notifier)
tm := gossip.NewDefaultTasksManagerFromConfig(conf.Tasks)
store := gossip.NewRestSnapshotStoreFromConfig(conf.Store)

agent, err := gossip.NewDefaultAgent(agentConfig, nil, store, tm, notifier)
if err != nil {
return err
}

bp := gossip.NewBatchProcessor(agent, []gossip.TaskFactory{gossip.PrinterFactory{}, publisherFactory{}})
agent.In.Subscribe(gossip.BatchMessageType, bp, 255)
defer bp.Stop()

agent.Start()
util.AwaitTermSignal(agent.Shutdown)
return nil
}

type publisherFactory struct {
}

func (p publisherFactory) Metrics() []prometheus.Collector {
QedPublisherInstancesCount.Inc()
return []prometheus.Collector{
QedPublisherInstancesCount,
QedPublisherBatchesReceivedTotal,
QedPublisherBatchesProcessSeconds,
}
}

var errorNoSnapshots error = fmt.Errorf("No snapshots were found on this batch!!")

return cmd
func (p publisherFactory) New(ctx context.Context) gossip.Task {
QedPublisherBatchesReceivedTotal.Inc()

a := ctx.Value("agent").(*gossip.Agent)
b := ctx.Value("batch").(*protocol.BatchSnapshots)

return func() error {
timer := prometheus.NewTimer(QedPublisherBatchesProcessSeconds)
defer timer.ObserveDuration()

batch := new(protocol.BatchSnapshots)
batch.Snapshots = make([]*protocol.SignedSnapshot, 0)
for _, signedSnap := range b.Snapshots {
_, err := a.Cache.Get(signedSnap.Signature)
if err != nil {
log.Debugf("PublishingTask: add snapshot to be published")
a.Cache.Set(signedSnap.Signature, []byte{0x0}, 0)
batch.Snapshots = append(batch.Snapshots, signedSnap)
}
}

if len(batch.Snapshots) < 1 {
return errorNoSnapshots
}
log.Debugf("Sending batch to snapshot sotre: ", batch)
return a.SnapshotStore.PutBatch(batch)
}
}

0 comments on commit d6a8b68

Please sign in to comment.