Skip to content

Commit

Permalink
Refactor config params
Browse files Browse the repository at this point in the history
  • Loading branch information
iknite committed Feb 19, 2019
1 parent c07f5a8 commit ad9933a
Showing 1 changed file with 55 additions and 52 deletions.
107 changes: 55 additions & 52 deletions tests/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/bbva/qed/log"
)

type Riot struct {
type Config struct {
// flags
endpoint string
apiKey string
Expand All @@ -52,67 +52,68 @@ func main() {
}

func newRiotCommand() *cobra.Command {
riot := Riot{}
config := Config{}

cmd := &cobra.Command{
Use: "riot",
Short: "Stresser tool for qed server",
PreRun: func(cmd *cobra.Command, args []string) {
riot.clusterSize = uint(viper.GetInt("cluster_size"))
if riot.clusterSize != 0 && riot.clusterSize != 2 && riot.clusterSize != 4 {
log.Fatalf("invalid cluster-size specified: %d (only 2 or 4)", riot.clusterSize)
config.clusterSize = uint(viper.GetInt("cluster_size"))
if config.clusterSize != 0 && config.clusterSize != 2 && config.clusterSize != 4 {
log.Fatalf("invalid cluster-size specified: %d (only 2 or 4)", config.clusterSize)
}
},
RunE: func(cmd *cobra.Command, args []string) error {
return Run(riot)
return Run(config)
},
}

f := cmd.Flags()
f.StringVar(&riot.endpoint, "endpoint", "http://localhost:8800", "The endopoint to make the load")
f.StringVar(&riot.apiKey, "apikey", "my-key", "The key to use qed servers")
f.BoolVar(&riot.insecure, "insecure", false, "Allow self-signed TLS certificates")
f.BoolVar(&riot.wantAdd, "add", false, "Execute add benchmark")
f.BoolVarP(&riot.wantMembership, "membership", "m", false, "Benchmark MembershipProof")
f.BoolVar(&riot.wantIncremental, "incremental", false, "Execute Incremental benchmark")
f.BoolVar(&riot.offload, "offload", false, "Perform reads only on %50 of the cluster size (With cluster size 2 reads will be performed only on follower1)")
f.BoolVar(&riot.charts, "charts", false, "Create charts while executing the benchmarks. Output: graph-$testname.png")
f.BoolVar(&riot.profiling, "profiling", false, "Enable Go profiling with pprof tool. $ go tool pprof -http : http://localhost:6061 ")
f.UintVarP(&riot.incrementalDelta, "delta", "d", 1000, "Specify delta for the IncrementalProof")
f.UintVar(&riot.numRequests, "n", 10e4, "Number of requests for the attack")
f.UintVar(&riot.maxGoRoutines, "r", 10, "Set the concurrency value")
f.UintVar(&riot.offset, "offset", 0, "The starting version from which we start the load")
f.UintVar(&riot.clusterSize, "cluster-size", 0, "")
f.StringVar(&config.endpoint, "endpoint", "http://localhost:8800", "The endopoint to make the load")
f.StringVar(&config.apiKey, "apikey", "my-key", "The key to use qed servers")
f.BoolVar(&config.insecure, "insecure", false, "Allow self-signed TLS certificates")
f.BoolVar(&config.wantAdd, "add", false, "Execute add benchmark")
f.BoolVarP(&config.wantMembership, "membership", "m", false, "Benchmark MembershipProof")
f.BoolVar(&config.wantIncremental, "incremental", false, "Execute Incremental benchmark")
f.BoolVar(&config.offload, "offload", false, "Perform reads only on %50 of the cluster size (With cluster size 2 reads will be performed only on follower1)")
f.BoolVar(&config.charts, "charts", false, "Create charts while executing the benchmarks. Output: graph-$testname.png")
f.BoolVar(&config.profiling, "profiling", false, "Enable Go profiling with pprof tool. $ go tool pprof -http : http://localhost:6061 ")
f.UintVarP(&config.incrementalDelta, "delta", "d", 1000, "Specify delta for the IncrementalProof")
f.UintVar(&config.numRequests, "n", 10e4, "Number of requests for the attack")
f.UintVar(&config.maxGoRoutines, "r", 10, "Set the concurrency value")
f.UintVar(&config.offset, "offset", 0, "The starting version from which we start the load")
f.UintVar(&config.clusterSize, "cluster-size", 0, "")

_ = viper.BindPFlag("cluster_size", f.Lookup("cluster-size"))
_ = viper.BindEnv("cluster_size", "CLUSTER_SIZE")

return cmd
}

func Run(r Riot) error {
func Run(conf Config) error {
var attack Attack

if r.wantAdd { // nolint:gocritic
if conf.wantAdd { // nolint:gocritic
log.Info("Benchmark ADD")
attack = Attack{
kind: "add",
}
} else if r.wantMembership {
} else if conf.wantMembership {
log.Info("Benchmark MEMBERSHIP")

attack = Attack{
kind: "membership",
balloonVersion: uint64(r.numRequests + r.offset - 1),
balloonVersion: uint64(conf.numRequests + conf.offset - 1),
}
} else if r.wantIncremental {
} else if conf.wantIncremental {
log.Info("Benchmark INCREMENTAL")

attack = Attack{
kind: "incremental",
}
}
attack.riot = r

attack.config = conf

attack.Run()
return nil
Expand All @@ -122,9 +123,10 @@ type Attack struct {
kind string
balloonVersion uint64

riot Riot
client *client.HTTPClient
ch chan Task
config Config
client *client.HTTPClient
reqChan chan uint
senChan chan Task
}

type Task struct {
Expand All @@ -137,67 +139,72 @@ type Task struct {

func (a *Attack) Run() {
a.CreateFanOut()
a.FanIn()
a.CreateFanIn()

for i := a.config.offset; i < a.config.offset+a.config.numRequests; i++ {
a.reqChan <- i
}

}
func (a *Attack) Shutdown() {
close(a.reqChan)
close(a.senChan)
}

func (a *Attack) FanIn() {
reqChan := make(chan uint, a.riot.numRequests)
func (a *Attack) CreateFanIn() {
a.reqChan = make(chan uint, a.config.numRequests/100)

for rID := uint(0); rID < a.riot.maxGoRoutines; rID++ {
for rID := uint(0); rID < a.config.maxGoRoutines; rID++ {
go func(rID uint) {
for {
id, ok := <-reqChan
id, ok := <-a.reqChan
if !ok {
log.Infof("Closing mux chan #%d", rID)
return
}
switch a.kind {
case "add":
a.ch <- Task{
a.senChan <- Task{
kind: a.kind,
event: fmt.Sprintf("event %d", id),
}
case "membership":
a.ch <- Task{
a.senChan <- Task{
kind: a.kind,
key: []byte(fmt.Sprintf("event %d", id)),
version: a.balloonVersion,
}

case "incremental":
a.ch <- Task{
a.senChan <- Task{
kind: a.kind,
start: uint64(id),
end: uint64(id + a.riot.incrementalDelta),
end: uint64(id + a.config.incrementalDelta),
}
}
}
}(rID)
}

for i := a.riot.offset; i < a.riot.offset+a.riot.numRequests; i++ {
reqChan <- i
}

}

func (a *Attack) CreateFanOut() {

cConf := client.DefaultConfig()
cConf.Endpoint = a.riot.endpoint
cConf.APIKey = a.riot.apiKey
cConf.Insecure = a.riot.insecure
cConf.Endpoint = a.config.endpoint
cConf.APIKey = a.config.apiKey
cConf.Insecure = a.config.insecure
a.client = client.NewHTTPClient(*cConf)
if err := a.client.Ping(); err != nil {
panic(err)
}
a.ch = make(chan Task, a.riot.numRequests)
a.senChan = make(chan Task, a.config.numRequests/100)

for rID := uint(0); rID < a.riot.maxGoRoutines; rID++ {
for rID := uint(0); rID < a.config.maxGoRoutines; rID++ {

go func(rID uint) {
for {
task, ok := <-a.ch
task, ok := <-a.senChan
if !ok {
log.Infof("Closing demux chan #%d", rID)
return
Expand All @@ -215,7 +222,3 @@ func (a *Attack) CreateFanOut() {
}(rID)
}
}

func (a *Attack) Shutdown() {
close(a.ch)
}

0 comments on commit ad9933a

Please sign in to comment.