Skip to content

Commit

Permalink
Add metrics, profiling and charts to new riot
Browse files Browse the repository at this point in the history
  • Loading branch information
iknite committed Feb 24, 2019
1 parent 8296a96 commit 57ace16
Showing 1 changed file with 156 additions and 31 deletions.
187 changes: 156 additions & 31 deletions tests/riot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@ import (
"fmt"
"net/http"
"os"
"time"

"github.com/imdario/mergo"
"github.com/spf13/cobra"
"github.com/spf13/viper"
chart "github.com/wcharczuk/go-chart"

"github.com/bbva/qed/client"
"github.com/bbva/qed/log"
)

type Config struct {
// flags
Endpoint string
APIKey string
Insecure bool
WantAdd bool
WantIncremental bool
WantMembership bool
// general conf
Endpoint string
APIKey string
Insecure bool

// load kinds
Add bool
Incremental bool
Membership bool

// stress conf
Offload bool
Charts bool
Profiling bool
Expand All @@ -46,6 +52,9 @@ type Config struct {
NumRequests uint
MaxGoRoutines uint
ClusterSize uint

// metrics
counter float64
}

func main() {
Expand All @@ -55,35 +64,53 @@ func main() {
}

func newRiotCommand() *cobra.Command {
// Input storage.
var logLevel string
var APIMode bool
config := Config{}

cmd := &cobra.Command{
Use: "riot",
Short: "Stresser tool for qed server",
PreRun: func(cmd *cobra.Command, args []string) {

log.SetLogger("Riot", logLevel)

config.ClusterSize = uint(viper.GetInt("cluster_size"))

if config.ClusterSize != 0 && config.ClusterSize != 2 && config.ClusterSize != 4 {
log.Fatalf("invalid cluster-size specified: %d (only 2 or 4)", config.ClusterSize)
}

},
Run: func(cmd *cobra.Command, args []string) {
if config.Profiling {
go func() {
log.Info("Go profiling enabled\n")
log.Info(http.ListenAndServe(":6061", nil))
}()
}

if APIMode {
Serve(config)
} else {
Run(config)
}

},
}

f := cmd.Flags()

f.StringVarP(&logLevel, "log", "l", "debug", "Choose between log levels: silent, error, info and debug")
f.BoolVar(&APIMode, "api", false, "Raise a HTTP api in port 11111 ")

f.StringVar(&config.Endpoint, "endpoint", "http://localhost:8800", "The endopoint to make the load")
f.StringVar(&config.APIKey, "apikey", "my-key", "The key to use qed servers")
f.BoolVar(&config.Insecure, "insecure", false, "Allow self-signed TLS certificates")
f.BoolVar(&config.WantAdd, "add", false, "Execute add benchmark")
f.BoolVarP(&config.WantMembership, "membership", "m", false, "Benchmark MembershipProof")
f.BoolVar(&config.WantIncremental, "incremental", false, "Execute Incremental benchmark")
f.BoolVar(&config.Add, "add", false, "Execute add benchmark")
f.BoolVarP(&config.Membership, "membership", "m", false, "Benchmark MembershipProof")
f.BoolVar(&config.Incremental, "incremental", false, "Execute Incremental benchmark")
f.BoolVar(&config.Offload, "offload", false, "Perform reads only on %50 of the cluster size (With cluster size 2 reads will be performed only on follower1)")
f.BoolVar(&config.Charts, "charts", false, "Create charts while executing the benchmarks. Output: graph-$testname.png")
f.BoolVar(&config.Profiling, "profiling", false, "Enable Go profiling with pprof tool. $ go tool pprof -http : http://localhost:6061 ")
Expand All @@ -99,7 +126,15 @@ func newRiotCommand() *cobra.Command {
return cmd
}

func Serve(defaultConf Config) {
func Run(paramsConf Config) {
setupMetrics(paramsConf)
newAttack(paramsConf)
}

func Serve(paramsConf Config) {

setupMetrics(paramsConf)

mux := http.NewServeMux()
mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
Expand All @@ -120,50 +155,60 @@ func Serve(defaultConf Config) {
return
}

var localConf Config
if err := mergo.Merge(&localConf, defaultConf); err != nil {
var conf Config
if err := mergo.Merge(&conf, paramsConf); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if err := mergo.Merge(&localConf, newConf); err != nil {
if err := mergo.Merge(&conf, newConf); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

fmt.Printf(">>>>>>>>>>>>> %+v", localConf)
go newAttack(conf)

})

api := &http.Server{
Addr: ":18800",
Handler: mux,
}

log.Debug(" * Starting Riot HTTP server")
if err := api.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start Riot API HTTP server: %s", err)
}
}

func Run(conf Config) {
type kind string

const (
add kind = "add"
membership kind = "membership"
incremental kind = "incremental"
)

func newAttack(conf Config) {
var attack Attack

if conf.WantAdd { // nolint:gocritic
if conf.Add { // nolint:gocritic
log.Info("Benchmark ADD")
attack = Attack{
kind: "add",
kind: add,
}
} else if conf.WantMembership {
} else if conf.Membership {
log.Info("Benchmark MEMBERSHIP")

attack = Attack{
kind: "membership",
kind: membership,
balloonVersion: uint64(conf.NumRequests + conf.Offset - 1),
}
} else if conf.WantIncremental {
} else if conf.Incremental {
log.Info("Benchmark INCREMENTAL")

attack = Attack{
kind: "incremental",
kind: incremental,
}
}

Expand All @@ -173,7 +218,7 @@ func Run(conf Config) {
}

type Attack struct {
kind string
kind kind
balloonVersion uint64

config Config
Expand All @@ -183,7 +228,7 @@ type Attack struct {
}

type Task struct {
kind string
kind kind

event string
key []byte
Expand Down Expand Up @@ -216,25 +261,25 @@ func (a *Attack) CreateFanIn() {
return
}
switch a.kind {
case "add":
case add:
a.senChan <- Task{
kind: a.kind,
event: fmt.Sprintf("event %d", id),
}
case "membership":
case membership:
a.senChan <- Task{
kind: a.kind,
key: []byte(fmt.Sprintf("event %d", id)),
version: a.balloonVersion,
}

case "incremental":
case incremental:
a.senChan <- Task{
kind: a.kind,
start: uint64(id),
end: uint64(id + a.config.IncrementalDelta),
}
}
a.config.counter++
}
}(rID)
}
Expand All @@ -251,6 +296,7 @@ func (a *Attack) CreateFanOut() {
if err := a.client.Ping(); err != nil {
panic(err)
}

a.senChan = make(chan Task, a.config.NumRequests/100)

for rID := uint(0); rID < a.config.MaxGoRoutines; rID++ {
Expand All @@ -264,14 +310,93 @@ func (a *Attack) CreateFanOut() {
}

switch task.kind {
case "add":
case add:
_, _ = a.client.Add(task.event)
case "membership":
case membership:
_, _ = a.client.Membership(task.key, task.version)
case "incremental":
case incremental:
_, _ = a.client.Incremental(task.start, task.end)
}
a.config.counter++
}
}(rID)
}
}

func chartsData(a *axis, elapsed, reqs float64) *axis {
a.x = append(a.x, elapsed)
a.y = append(a.y, reqs)

return a
}

func setupMetrics(conf Config) {
graph := &axis{}
ticker := time.NewTicker(1 * time.Second)
start := time.Now()
defer ticker.Stop()

if conf.Charts {
if err := os.Mkdir("results", 0755); err != nil {
log.Error("Unable to create `results` folder")
}
}

go func() {
for {
<-ticker.C
elapsed := time.Since(start).Seconds()
if conf.Charts {
go drawChart(conf, chartsData(graph, elapsed, conf.counter/elapsed))
}
summaryPerDuration(conf, elapsed)
}
}()

}

func summaryPerDuration(conf Config, elapsed float64) {

log.Infof(
"Throughput: %.0f req/s | Concurrency: %d | Elapsed time: %.3f seconds\n",
conf.counter/elapsed,
conf.MaxGoRoutines,
elapsed,
)
}

type axis struct {
x, y []float64
}

func drawChart(conf Config, a *axis) {
graph := chart.Chart{
XAxis: chart.XAxis{
Name: "Time",
NameStyle: chart.StyleShow(),
Style: chart.StyleShow(),
},
YAxis: chart.YAxis{
Name: "Reqests",
NameStyle: chart.StyleShow(),
Style: chart.StyleShow(),
},
Series: []chart.Series{
chart.ContinuousSeries{
Style: chart.Style{
Show: true,
StrokeColor: chart.GetDefaultColor(0).WithAlpha(64),
FillColor: chart.GetDefaultColor(0).WithAlpha(64),
},

XValues: a.x,
YValues: a.y,
},
},
}

req := fmt.Sprint(conf.NumRequests)
file, _ := os.Create("results/graph-" + req + ".png")
defer file.Close()
_ = graph.Render(chart.PNG, file)
}

0 comments on commit 57ace16

Please sign in to comment.