diff --git a/balloon/balloon.go b/balloon/balloon.go index bd7f07b5c..45856ad93 100644 --- a/balloon/balloon.go +++ b/balloon/balloon.go @@ -17,6 +17,7 @@ package balloon import ( + "errors" "fmt" "sync" @@ -265,35 +266,38 @@ func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) leaf, err = b.store.Get(storage.IndexPrefix, proof.KeyDigest) switch { case err != nil && err != storage.ErrKeyNotFound: - return nil, fmt.Errorf("Error reading leaf %v data: %v", proof.KeyDigest, err) + return nil, fmt.Errorf("error reading leaf %v data: %v", proof.KeyDigest, err) + case err != nil && err == storage.ErrKeyNotFound: proof.Exists = false proof.ActualVersion = version - leaf = &storage.KVPair{keyDigest, util.Uint64AsBytes(version)} + leaf = &storage.KVPair{Key: keyDigest, Value: util.Uint64AsBytes(version)} + case err == nil: proof.Exists = true proof.ActualVersion = util.BytesAsUint64(leaf.Value) - } - if proof.ActualVersion <= version { - wg.Add(1) - go func() { - defer wg.Done() - historyProof, historyErr = b.historyTree.ProveMembership(proof.ActualVersion, version) - }() - } else { - return nil, fmt.Errorf("Query version %d is not on history tree which version is %d", version, proof.ActualVersion) + if proof.ActualVersion <= version { + wg.Add(1) + go func() { + defer wg.Done() + historyProof, historyErr = b.historyTree.ProveMembership(proof.ActualVersion, version) + }() + } else { + return nil, fmt.Errorf("query version %d is not on history tree which version is %d", version, proof.ActualVersion) + } + } hyperProof, hyperErr = b.hyperTree.QueryMembership(leaf.Key, leaf.Value) wg.Wait() if hyperErr != nil { - return nil, fmt.Errorf("Unable to get proof from hyper tree: %v", err) + return nil, fmt.Errorf("unable to get proof from hyper tree: %v", err) } if historyErr != nil { - return nil, fmt.Errorf("Unable to get proof from history tree: %v", err) + return nil, fmt.Errorf("unable to get proof from history tree: %v", err) } proof.HyperProof = hyperProof @@ -310,20 +314,25 @@ func (b Balloon) QueryConsistency(start, end uint64) (*IncrementalProof, error) // Metrics metrics.QedBalloonIncrementalTotal.Inc() - //timer := prometheus.NewTimer(metrics.QedBalloonIncrementalDurationSeconds) - //defer timer.ObserveDuration() stats := metrics.Balloon stats.AddFloat("QueryConsistency", 1) var proof IncrementalProof + if start >= b.version || + end >= b.version || + start >= end { + + return nil, errors.New("unable to process proof from history tree: invalid range") + } + proof.Start = start proof.End = end proof.Hasher = b.hasherF() historyProof, err := b.historyTree.ProveConsistency(start, end) if err != nil { - return nil, fmt.Errorf("Unable to get proof from history tree: %v", err) + return nil, fmt.Errorf("unable to get proof from history tree: %v", err) } proof.AuditPath = historyProof.AuditPath diff --git a/balloon/balloon_test.go b/balloon/balloon_test.go index f0c3a3eb8..2c91ca05e 100644 --- a/balloon/balloon_test.go +++ b/balloon/balloon_test.go @@ -71,6 +71,10 @@ func TestQueryMembership(t *testing.T) { {[]byte{0x5a}, uint64(0)}, } + // Asking for a future/wrong membership should not fail + _, err = balloon.QueryMembership([]byte{0x10}, 15) + require.NoError(t, err) + for i, c := range testCases { _, mutations, err := balloon.Add(c.key) require.NoErrorf(t, err, "Error adding event %d", i) @@ -165,6 +169,9 @@ func TestQueryConsistencyProof(t *testing.T) { balloon, err := NewBalloon(store, hashing.NewFakeXorHasher) require.NoError(t, err) + _, err = balloon.QueryConsistency(uint64(30), uint64(600)) + require.Error(t, err, "Asking for a future/wrong consitency should fail") + for j := 0; j <= int(c.end); j++ { _, mutations, err := balloon.Add(util.Uint64AsBytes(uint64(j))) require.NoErrorf(t, err, "Error adding event %d", j) diff --git a/client/client.go b/client/client.go index b453d32d9..cfbdbe105 100644 --- a/client/client.go +++ b/client/client.go @@ -33,7 +33,6 @@ import ( "github.com/bbva/qed/balloon" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" - "github.com/bbva/qed/metrics" "github.com/bbva/qed/protocol" ) @@ -212,7 +211,6 @@ func (c HTTPClient) Ping() error { // Add will do a request to the server with a post data to store a new event. func (c *HTTPClient) Add(event string) (*protocol.Snapshot, error) { - metrics.ClientEventAdd.Inc() data, _ := json.Marshal(&protocol.Event{Event: []byte(event)}) body, err := c.doReq("POST", "/events", data) @@ -230,8 +228,6 @@ func (c *HTTPClient) Add(event string) (*protocol.Snapshot, error) { // Membership will ask for a Proof to the server. func (c *HTTPClient) Membership(key []byte, version uint64) (*protocol.MembershipResult, error) { - metrics.ClientQueryMembership.Inc() - query, _ := json.Marshal(&protocol.MembershipQuery{ Key: key, Version: version, @@ -252,8 +248,6 @@ func (c *HTTPClient) Membership(key []byte, version uint64) (*protocol.Membershi // Membership will ask for a Proof to the server. func (c *HTTPClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (*protocol.MembershipResult, error) { - metrics.ClientQueryMembership.Inc() - query, _ := json.Marshal(&protocol.MembershipDigest{ KeyDigest: keyDigest, Version: version, @@ -274,8 +268,6 @@ func (c *HTTPClient) MembershipDigest(keyDigest hashing.Digest, version uint64) // Incremental will ask for an IncrementalProof to the server. func (c *HTTPClient) Incremental(start, end uint64) (*protocol.IncrementalResponse, error) { - metrics.ClientQueryIncremental.Inc() - query, _ := json.Marshal(&protocol.IncrementalRequest{ Start: start, End: end, diff --git a/metrics/metrics.go b/metrics/metrics.go index a0b4cdd7b..51308d20e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -151,27 +151,6 @@ var ( }, ) - // Client - - ClientEventAdd = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "client_event_add", - Help: "Number of events added into the cluster.", - }, - ) - ClientQueryMembership = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "client_query_membership", - Help: "Number of single events directly verified into the cluster.", - }, - ) - ClientQueryIncremental = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "client_query_incremental", - Help: "Number of range of verified events queried into the cluster.", - }, - ) - // PROMETHEUS metricsList = []prometheus.Collector{ @@ -190,10 +169,6 @@ var ( QedSenderInstancesCount, QedSenderBatchesSentTotal, - - ClientEventAdd, - ClientQueryMembership, - ClientQueryIncremental, } registerMetrics sync.Once diff --git a/protocol/protocol.go b/protocol/protocol.go index 592c9f565..8e0365f0b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -125,10 +125,16 @@ type IncrementalResponse struct { // ToMembershipProof translates internal api balloon.MembershipProof to the // public struct protocol.MembershipResult. func ToMembershipResult(key []byte, mp *balloon.MembershipProof) *MembershipResult { + + var serialized map[string]hashing.Digest + if mp.HistoryProof != nil && mp.HistoryProof.AuditPath != nil { + serialized = mp.HistoryProof.AuditPath.Serialize() + } + return &MembershipResult{ mp.Exists, mp.HyperProof.AuditPath, - mp.HistoryProof.AuditPath.Serialize(), + serialized, mp.CurrentVersion, mp.QueryVersion, mp.ActualVersion, diff --git a/tests/riot.go b/tests/riot.go index bcfcd0ef9..74535dbc1 100644 --- a/tests/riot.go +++ b/tests/riot.go @@ -17,531 +17,320 @@ package main import ( - "bufio" - "bytes" - "crypto/tls" "encoding/json" - "flag" "fmt" - "io" - "io/ioutil" - "log" - "math" "net/http" - _ "net/http/pprof" "os" - "strconv" "sync" - "time" - "github.com/bbva/qed/protocol" -) + "github.com/imdario/mergo" + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" -var ( - endpoint string - apiKey string - wantAdd bool - wantIncremental bool - wantMembership bool - offload bool - - profiling bool - incrementalDelta int - offset int - numRequests int - readConcurrency int - writeConcurrency int - delay_ms int + "github.com/bbva/qed/api/metricshttp" + "github.com/bbva/qed/client" + "github.com/bbva/qed/log" ) -func init() { - // Create a default config to use as default values in flags - config := NewDefaultConfig() - - flag.StringVar(&endpoint, "endpoint", "http://localhost:8800", "The endopoint to make the load") - flag.StringVar(&apiKey, "apikey", "my-key", "The key to use qed servers") - flag.BoolVar(&wantAdd, "add", false, "Execute add benchmark") - flag.IntVar(&delay_ms, "delay", 0, "Set request delay in milliseconds") - - usage := "Benchmark MembershipProof" - flag.BoolVar(&wantMembership, "membership", false, usage) - flag.BoolVar(&wantMembership, "m", false, usage+" (shorthand)") - - flag.BoolVar(&wantIncremental, "incremental", false, "Execute Incremental benchmark") - flag.BoolVar(&offload, "offload", false, "Perform reads only on %50 of the cluster size (With cluster size 2 reads will be performed only on follower1)") - flag.BoolVar(&profiling, "profiling", false, "Enable Go profiling with pprof tool. $ go tool pprof -http : http://localhost:6061 ") - - usageDelta := "Specify delta for the IncrementalProof" - flag.IntVar(&incrementalDelta, "delta", 1000, usageDelta) - flag.IntVar(&incrementalDelta, "d", 1000, usageDelta+" (shorthand)") - - flag.IntVar(&numRequests, "n", 10e4, "Number of requests for the attack") - flag.IntVar(&readConcurrency, "r", config.maxGoRoutines, "Set read concurrency value") - flag.IntVar(&writeConcurrency, "w", config.maxGoRoutines, "Set write concurrency value") - flag.IntVar(&offset, "offset", 0, "The starting version from which we start the load") -} - -type Config struct { - maxGoRoutines int - numRequests int - apiKey string - startVersion int - continuous bool - balloonVersion uint64 - counter float64 - delay_ms time.Duration - req HTTPClient -} - -type HTTPClient struct { - client *http.Client - method string - endpoint string - expectedStatusCode int -} +var ( + // Client -// type Config map[string]interface{} -func NewDefaultConfig() *Config { - - return &Config{ - maxGoRoutines: 10, - numRequests: numRequests, - apiKey: apiKey, - startVersion: 0, - continuous: false, - balloonVersion: uint64(numRequests) - 1, - counter: 0, - req: HTTPClient{ - client: nil, - method: "POST", - endpoint: endpoint, - expectedStatusCode: 200, + RiotEventAdd = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "riot_event_add", + Help: "Number of events added into the cluster.", + }, + ) + RiotQueryMembership = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "riot_query_membership", + Help: "Number of single events directly verified into the cluster.", }, + ) + RiotQueryIncremental = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "riot_query_incremental", + Help: "Number of range of verified events queried into the cluster.", + }, + ) + metricsList = []prometheus.Collector{ + RiotEventAdd, + RiotQueryMembership, + RiotQueryIncremental, } -} - -type Task func(goRoutineId int, c *Config) ([]byte, error) -// func (t *Task) Timeout()event -func SpawnerOfEvil(c *Config, t Task) { - var wg sync.WaitGroup + registerMetrics sync.Once +) - for goRoutineId := 0; goRoutineId < c.maxGoRoutines; goRoutineId++ { - wg.Add(1) - go func(goRoutineId int) { - defer wg.Done() - Attacker(goRoutineId, c, t) - }(goRoutineId) - } - wg.Wait() +// Register all metrics. +func Register(r *prometheus.Registry) { + // Register the metrics. + registerMetrics.Do( + func() { + for _, metric := range metricsList { + r.MustRegister(metric) + } + }, + ) } -func Attacker(goRoutineId int, c *Config, f func(j int, c *Config) ([]byte, error)) { +type Riot struct { + Config Config - for eventIndex := c.startVersion + goRoutineId; eventIndex < c.startVersion+c.numRequests || c.continuous; eventIndex += c.maxGoRoutines { - query, err := f(eventIndex, c) - if len(query) == 0 { - log.Fatalf("Empty query: %v", err) - } + metricsServer *http.Server + prometheusRegistry *prometheus.Registry +} - req, err := http.NewRequest(c.req.method, c.req.endpoint, bytes.NewBuffer(query)) - if err != nil { - log.Fatalf("Error preparing request: %v", err) - } +type Config struct { + // general conf + Endpoint string + APIKey string + Insecure bool + + // stress conf + Kind string + Offload bool + Profiling bool + IncrementalDelta uint + Offset uint + NumRequests uint + MaxGoRoutines uint + ClusterSize uint +} - // Set Api-Key header - req.Header.Set("Api-Key", c.apiKey) - res, err := c.req.client.Do(req) - if err != nil { - log.Fatalf("Unable to perform request: %v", err) - } +type Plan [][]Config - if res.StatusCode != c.req.expectedStatusCode { - log.Fatalf("Server error: %v", res) - } +type kind string - c.counter++ +const ( + add kind = "add" + membership kind = "membership" + incremental kind = "incremental" +) - _, _ = io.Copy(ioutil.Discard, res.Body) - res.Body.Close() +type Attack struct { + kind kind + balloonVersion uint64 - c.delay_ms = time.Duration(delay_ms) - time.Sleep(c.delay_ms * time.Millisecond) - } - c.counter = 0 + config Config + client *client.HTTPClient + senChan chan Task } -func addSampleEvents(eventIndex int, c *Config) ([]byte, error) { +type Task struct { + kind kind - return json.Marshal( - &protocol.Event{ - []byte(fmt.Sprintf("event %d", eventIndex)), - }, - ) + event string + key []byte + version, start, end uint64 } -func queryMembership(eventIndex int, c *Config) ([]byte, error) { - return json.Marshal( - &protocol.MembershipQuery{ - []byte(fmt.Sprintf("event %d", eventIndex)), - c.balloonVersion, - }, - ) +func main() { + if err := newRiotCommand().Execute(); err != nil { + os.Exit(-1) + } } -func queryIncremental(eventIndex int, c *Config) ([]byte, error) { - end := uint64(eventIndex) - start := uint64(math.Max(float64(eventIndex-incrementalDelta), 0.0)) - // start := end >> 1 - return json.Marshal( - &protocol.IncrementalRequest{ - Start: start, - End: end, - }, - ) -} +func newRiotCommand() *cobra.Command { + // Input storage. + var logLevel string + var APIMode bool + riot := Riot{} -func getVersion(eventTemplate string, c *Config) uint64 { - ssl := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } + cmd := &cobra.Command{ + Use: "riot", + Short: "Stresser tool for qed server", + PreRun: func(cmd *cobra.Command, args []string) { - client := &http.Client{Transport: ssl} + log.SetLogger("Riot", logLevel) - buf := fmt.Sprintf(eventTemplate) + if riot.Config.Profiling { + go func() { + log.Info(" * Starting Riot Profiling server") + log.Info(http.ListenAndServe(":6060", nil)) + }() + } - query, err := json.Marshal(&protocol.Event{[]byte(buf)}) - if len(query) == 0 { - log.Fatalf("Empty query: %v", err) + }, + Run: func(cmd *cobra.Command, args []string) { + riot.Start(APIMode) + }, } - req, err := http.NewRequest(c.req.method, c.req.endpoint, bytes.NewBuffer(query)) - if err != nil { - log.Fatalf("Error preparing request: %v", err) - } + f := cmd.Flags() - // Set Api-Key header - req.Header.Set("Api-Key", c.apiKey) - res, err := client.Do(req) - if err != nil { - log.Fatalf("Unable to perform request: %v", err) - } - defer res.Body.Close() + f.StringVarP(&logLevel, "log", "l", "debug", "Choose between log levels: silent, error, info and debug") + f.BoolVar(&APIMode, "api", false, "Raise a HTTP api in port 7700") - if res.StatusCode != 201 { - log.Fatalf("Server error: %v", err) - } + f.StringVar(&riot.Config.Endpoint, "endpoint", "http://localhost:8800", "The endopoint to make the load") + f.StringVarP(&riot.Config.APIKey, "apikey", "k", "my-key", "The key to use qed servers") + f.BoolVar(&riot.Config.Insecure, "insecure", false, "Allow self-signed TLS certificates") - body, _ := ioutil.ReadAll(res.Body) + f.StringVar(&riot.Config.Kind, "kind", "", "The kind of load to execute") - var signedSnapshot protocol.SignedSnapshot - json.Unmarshal(body, &signedSnapshot) - version := signedSnapshot.Snapshot.Version + f.BoolVar(&riot.Config.Profiling, "profiling", false, "Enable Go profiling $ go tool pprof") + f.UintVarP(&riot.Config.IncrementalDelta, "delta", "d", 1000, "Specify delta for the IncrementalProof") + f.UintVar(&riot.Config.NumRequests, "n", 10e4, "Number of requests for the attack") + f.UintVar(&riot.Config.MaxGoRoutines, "r", 10, "Set the concurrency value") + f.UintVar(&riot.Config.Offset, "offset", 0, "The starting version from which we start the load") - return version + return cmd } -type axis struct { - x, y []float64 -} +func (riot *Riot) Start(APIMode bool) { -func summary(message string, numRequestsf, elapsed float64, c *Config) { + r := prometheus.NewRegistry() + Register(r) + riot.prometheusRegistry = r + metricsMux := metricshttp.NewMetricsHTTP(r) + log.Debug(" * Starting Riot Metrics server") + riot.metricsServer = &http.Server{Addr: ":17700", Handler: metricsMux} - fmt.Printf( - "%s throughput: %.0f req/s: (%v reqs in %.3f seconds) | Concurrency: %d\n", - message, - numRequestsf/elapsed, - c.numRequests, - elapsed, - c.maxGoRoutines, - ) -} - -func summaryPerDuration(message string, numRequestsf, elapsed float64, c *Config) { + if APIMode { + riot.Serve() + } else { + riot.RunOnce() + } - fmt.Printf( - "%s throughput: %.0f req/s | Concurrency: %d | Elapsed time: %.3f seconds\n", - message, - c.counter/elapsed, - c.maxGoRoutines, - elapsed, - ) } -func stats(c *Config, t Task, message string) { - ticker := time.NewTicker(1 * time.Second) - numRequestsf := float64(c.numRequests) - start := time.Now() - defer ticker.Stop() - done := make(chan bool) - go func() { - SpawnerOfEvil(c, t) - elapsed := time.Now().Sub(start).Seconds() - fmt.Println("Task done.") - summary(message, numRequestsf, elapsed, c) - done <- true - }() - for { - select { - case <-done: - return - case t := <-ticker.C: - _ = t - elapsed := time.Now().Sub(start).Seconds() - summaryPerDuration(message, numRequestsf, elapsed, c) - } - } +func (riot *Riot) RunOnce() { + newAttack(riot.Config) } -func benchmarkAdd(numFollowers, numReqests, readConcurrency, writeConcurrency, offset int) { - fmt.Println("\nStarting benchmark run...") - - ssl := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +func postReqSanitizer(w http.ResponseWriter, r *http.Request) (http.ResponseWriter, *http.Request) { + if r.Method != "POST" { + w.Header().Set("Allow", "POST") + w.WriteHeader(http.StatusMethodNotAllowed) + return w, r } - client := &http.Client{Transport: ssl} - - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = writeConcurrency - c.startVersion = offset - c.req.expectedStatusCode = 201 - c.req.endpoint += "/events" - stats(c, addSampleEvents, "Add") + if r.Body == nil { + http.Error(w, "Please send a request body", http.StatusBadRequest) + } + return w, r +} +func (riot *Riot) MergeConf(newConf Config) Config { + conf := riot.Config + _ = mergo.Merge(&conf, newConf) + return conf } -func benchmarkMembership(numFollowers, numReqests, readConcurrency, writeConcurrency int) { - fmt.Println("\nStarting benchmark run...") - var queryWg sync.WaitGroup - ssl := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } +func (riot *Riot) Serve() { - client := &http.Client{Transport: ssl} - - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = writeConcurrency - c.req.expectedStatusCode = 201 - c.req.endpoint += "/events" - fmt.Println("PRELOAD") - stats(c, addSampleEvents, "Preload") - - config := make([]*Config, 0, numFollowers) - if numFollowers == 0 { - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = readConcurrency - c.req.expectedStatusCode = 200 - c.req.endpoint += "/proofs/membership" - - config = append(config, c) - } - for i := 0; i < numFollowers; i++ { - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = readConcurrency - c.req.expectedStatusCode = 200 - c.req.endpoint = fmt.Sprintf("http://localhost:%d", 8801+i) - c.req.endpoint += "/proofs/membership" - - config = append(config, c) - } + mux := http.NewServeMux() + mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { + w, r = postReqSanitizer(w, r) - time.Sleep(1 * time.Second) + var newConf Config + err := json.NewDecoder(r.Body).Decode(&newConf) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - fmt.Println("EXCLUSIVE QUERY MEMBERSHIP") - stats(config[0], queryMembership, "Follower-1-read") + newAttack(riot.MergeConf(newConf)) + }) - go hotParams(config) - fmt.Println("QUERY MEMBERSHIP UNDER CONTINUOUS LOAD") - for i, c := range config { - queryWg.Add(1) - go func(i int, c *Config) { - defer queryWg.Done() - stats(c, queryMembership, fmt.Sprintf("Follower-%d-read-mixed", i+1)) - }(i, c) - } + mux.HandleFunc("/plan", func(w http.ResponseWriter, r *http.Request) { + var wg sync.WaitGroup + w, r = postReqSanitizer(w, r) - fmt.Println("Starting continuous load...") - ca := NewDefaultConfig() - ca.req.client = client - ca.numRequests = numReqests - ca.maxGoRoutines = writeConcurrency - ca.req.expectedStatusCode = 201 - ca.req.endpoint += "/events" - ca.startVersion = c.numRequests - ca.continuous = true - - start := time.Now() - go stats(ca, addSampleEvents, "Leader-write-mixed") - queryWg.Wait() - elapsed := time.Now().Sub(start).Seconds() - - numRequestsf := float64(c.numRequests) - currentVersion := getVersion("last-event", c) - fmt.Printf( - "Leader write throughput: %.0f req/s: (%v reqs in %.3f seconds) | Concurrency: %d\n", - (float64(currentVersion)-numRequestsf)/elapsed, - currentVersion-uint64(c.numRequests), - elapsed, - c.maxGoRoutines, - ) -} - -func benchmarkIncremental(numFollowers, numReqests, readConcurrency, writeConcurrency int) { - fmt.Println("\nStarting benchmark run...") - var queryWg sync.WaitGroup - ssl := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: ssl} - - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = writeConcurrency - c.req.expectedStatusCode = 201 - c.req.endpoint += "/events" - - fmt.Println("PRELOAD") - stats(c, addSampleEvents, "Preload") - - config := make([]*Config, 0, numFollowers) - if numFollowers == 0 { - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = readConcurrency - c.req.expectedStatusCode = 200 - c.req.endpoint += "/proofs/incremental" - - config = append(config, c) - } - for i := 0; i < numFollowers; i++ { - c := NewDefaultConfig() - c.req.client = client - c.numRequests = numReqests - c.maxGoRoutines = readConcurrency - c.req.expectedStatusCode = 200 - c.req.endpoint = fmt.Sprintf("http://localhost:%d", 8801+i) - c.req.endpoint += "/proofs/incremental" - - config = append(config, c) - } + var plan Plan + err := json.NewDecoder(r.Body).Decode(&plan) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - time.Sleep(1 * time.Second) - fmt.Println("EXCLUSIVE QUERY INCREMENTAL") - stats(config[0], queryIncremental, "Follower-1-read") - - go hotParams(config) - fmt.Println("QUERY INCREMENTAL UNDER CONTINUOUS LOAD") - for i, c := range config { - queryWg.Add(1) - go func(i int, c *Config) { - defer queryWg.Done() - stats(c, queryIncremental, fmt.Sprintf("Follower-%d-read-mixed", i+1)) - }(i, c) - } + for _, batch := range plan { + for _, conf := range batch { + wg.Add(1) + go func(conf Config) { + newAttack(riot.MergeConf(conf)) + wg.Done() + }(conf) - fmt.Println("Starting continuous load...") - ca := NewDefaultConfig() - ca.req.client = client - ca.numRequests = numReqests - ca.maxGoRoutines = writeConcurrency - ca.req.expectedStatusCode = 201 - ca.req.endpoint += "/events" - ca.startVersion = c.numRequests - ca.continuous = true - - start := time.Now() - go stats(ca, addSampleEvents, "Leader-write-mixed") - queryWg.Wait() - elapsed := time.Now().Sub(start).Seconds() - - numRequestsf := float64(c.numRequests) - currentVersion := getVersion("last-event", c) - fmt.Printf( - "Leader-write-mixed throughput: %.0f req/s: (%v reqs in %.3f seconds) | Concurrency: %d\n", - (float64(currentVersion)-numRequestsf)/elapsed, - currentVersion-uint64(c.numRequests), - elapsed, - c.maxGoRoutines, - ) -} -func hotParams(config []*Config) { - scanner := bufio.NewScanner(os.Stdin) - - for scanner.Scan() { - value := scanner.Text() - - switch t := value[0:2]; t { - case "mr": - i, _ := strconv.ParseInt(value[2:], 10, 64) - d := time.Duration(i) - for _, c := range config { - c.delay_ms = d - } - fmt.Printf("Read throughtput set to: %d\n", i) - case "ir": - i, _ := strconv.ParseInt(value[2:], 10, 64) - d := time.Duration(i) - for _, c := range config { - c.delay_ms = d } - fmt.Printf("Read throughtput set to: %d\n", i) - default: - fmt.Println("Invalid command - Valid commands: mr100|ir200") + wg.Wait() } + }) + api := &http.Server{Addr: ":7700", Handler: mux} + + log.Debug(" * Starting Riot HTTP server") + if err := api.ListenAndServe(); err != http.ErrServerClosed { + log.Errorf("Can't start Riot API HTTP server: %s", err) } } -func main() { - var n int - switch m := os.Getenv("CLUSTER_SIZE"); m { - case "": - n = 0 - case "2": - n = 2 - case "4": - n = 4 - default: - fmt.Println("Error: CLUSTER_SIZE env var should have values 2 or 4, or not be defined at all.") - } +func newAttack(conf Config) { - flag.Parse() + cConf := client.DefaultConfig() + cConf.Endpoints = []string{conf.Endpoint} + cConf.APIKey = conf.APIKey + cConf.Insecure = conf.Insecure - if profiling { - go func() { - fmt.Print("Go profiling enabled\n") - log.Print(http.ListenAndServe(":6061", nil)) - }() + attack := Attack{ + client: client.NewHTTPClient(*cConf), + config: conf, + kind: kind(conf.Kind), + balloonVersion: uint64(conf.NumRequests + conf.Offset - 1), } - if offload { - n = n / 2 - fmt.Printf("Offload: %v | %d\n", offload, n) + if err := attack.client.Ping(); err != nil { + panic(err) } - if wantAdd { - fmt.Print("Benchmark ADD") - benchmarkAdd(n, numRequests, readConcurrency, writeConcurrency, offset) - } + attack.Run() +} + +func (a *Attack) Run() { + var wg sync.WaitGroup + a.senChan = make(chan Task) - if wantMembership { - fmt.Print("Benchmark MEMBERSHIP") - benchmarkMembership(n, numRequests, readConcurrency, writeConcurrency) + for rID := uint(0); rID < a.config.MaxGoRoutines; rID++ { + wg.Add(1) + go func(rID uint) { + for { + task, ok := <-a.senChan + if !ok { + log.Debugf("!!! clos: %d", rID) + wg.Done() + return + } + + switch task.kind { + case add: + log.Debugf(">>> add: %s", task.event) + _, _ = a.client.Add(task.event) + RiotEventAdd.Inc() + case membership: + log.Debugf(">>> mem: %s, %d", task.event, task.version) + _, _ = a.client.Membership(task.key, task.version) + RiotQueryMembership.Inc() + case incremental: + log.Debugf(">>> inc: %s", task.event) + _, _ = a.client.Incremental(task.start, task.end) + RiotQueryIncremental.Inc() + } + } + }(rID) } - if wantIncremental { - fmt.Print("Benchmark INCREMENTAL") - benchmarkIncremental(n, numRequests, readConcurrency, writeConcurrency) + for i := a.config.Offset; i < a.config.Offset+a.config.NumRequests; i++ { + ev := fmt.Sprintf("event %d", i) + a.senChan <- Task{ + kind: a.kind, + event: ev, + key: []byte(ev), + version: a.balloonVersion, + start: uint64(i), + end: uint64(i + a.config.IncrementalDelta), + } } + + close(a.senChan) + wg.Wait() } diff --git a/tests/riot/main.go b/tests/riot/main.go deleted file mode 100644 index 0dba21812..000000000 --- a/tests/riot/main.go +++ /dev/null @@ -1,350 +0,0 @@ -/* - Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package main - -import ( - "encoding/json" - "fmt" - "net/http" - "os" - "time" - - "github.com/imdario/mergo" - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/bbva/qed/client" - "github.com/bbva/qed/log" -) - -type Config struct { - // general conf - Endpoint string - APIKey string - Insecure bool - - // load kinds - Add bool - Incremental bool - Membership bool - - // stress conf - Offload bool - Profiling bool - IncrementalDelta uint - Offset uint - NumRequests uint - MaxGoRoutines uint - ClusterSize uint - - // metrics - counter float64 -} - -func main() { - if err := newRiotCommand().Execute(); err != nil { - os.Exit(-1) - } -} - -func newRiotCommand() *cobra.Command { - // Input storage. - var logLevel string - var APIMode bool - config := Config{} - - cmd := &cobra.Command{ - Use: "riot", - Short: "Stresser tool for qed server", - PreRun: func(cmd *cobra.Command, args []string) { - - log.SetLogger("Riot", logLevel) - - config.ClusterSize = uint(viper.GetInt("cluster_size")) - - if config.ClusterSize != 0 && config.ClusterSize != 2 && config.ClusterSize != 4 { - log.Fatalf("invalid cluster-size specified: %d (only 2 or 4)", config.ClusterSize) - } - - }, - Run: func(cmd *cobra.Command, args []string) { - if config.Profiling { - go func() { - log.Info("Go profiling enabled\n") - log.Info(http.ListenAndServe(":6061", nil)) - }() - } - - if APIMode { - Serve(config) - } else { - Run(config) - } - - }, - } - - f := cmd.Flags() - - f.StringVarP(&logLevel, "log", "l", "debug", "Choose between log levels: silent, error, info and debug") - f.BoolVar(&APIMode, "api", false, "Raise a HTTP api in port 11111 ") - - f.StringVar(&config.Endpoint, "endpoint", "http://localhost:8800", "The endopoint to make the load") - f.StringVar(&config.APIKey, "apikey", "my-key", "The key to use qed servers") - f.BoolVar(&config.Insecure, "insecure", false, "Allow self-signed TLS certificates") - f.BoolVar(&config.Add, "add", false, "Execute add benchmark") - f.BoolVarP(&config.Membership, "membership", "m", false, "Benchmark MembershipProof") - f.BoolVar(&config.Incremental, "incremental", false, "Execute Incremental benchmark") - f.BoolVar(&config.Offload, "offload", false, "Perform reads only on %50 of the cluster size (With cluster size 2 reads will be performed only on follower1)") - f.BoolVar(&config.Profiling, "profiling", false, "Enable Go profiling with pprof tool. $ go tool pprof -http : http://localhost:6061 ") - f.UintVarP(&config.IncrementalDelta, "delta", "d", 1000, "Specify delta for the IncrementalProof") - f.UintVar(&config.NumRequests, "n", 10e4, "Number of requests for the attack") - f.UintVar(&config.MaxGoRoutines, "r", 10, "Set the concurrency value") - f.UintVar(&config.Offset, "offset", 0, "The starting version from which we start the load") - f.UintVar(&config.ClusterSize, "cluster-size", 0, "") - - _ = viper.BindPFlag("cluster_size", f.Lookup("cluster-size")) - _ = viper.BindEnv("cluster_size", "CLUSTER_SIZE") - - return cmd -} - -func Run(paramsConf Config) { - setupMetrics(paramsConf) - newAttack(paramsConf) -} - -func Serve(paramsConf Config) { - - setupMetrics(paramsConf) - - mux := http.NewServeMux() - mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - w.Header().Set("Allow", "POST") - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - if r.Body == nil { - http.Error(w, "Please send a request body", http.StatusBadRequest) - return - } - - var newConf Config - err := json.NewDecoder(r.Body).Decode(&newConf) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var conf Config - if err := mergo.Merge(&conf, paramsConf); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if err := mergo.Merge(&conf, newConf); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - go newAttack(conf) - - }) - - api := &http.Server{ - Addr: ":18800", - Handler: mux, - } - - log.Debug(" * Starting Riot HTTP server") - if err := api.ListenAndServe(); err != http.ErrServerClosed { - log.Errorf("Can't start Riot API HTTP server: %s", err) - } -} - -type kind string - -const ( - add kind = "add" - membership kind = "membership" - incremental kind = "incremental" -) - -func newAttack(conf Config) { - var attack Attack - - if conf.Add { // nolint:gocritic - log.Info("Benchmark ADD") - attack = Attack{ - kind: add, - } - } else if conf.Membership { - log.Info("Benchmark MEMBERSHIP") - - attack = Attack{ - kind: membership, - balloonVersion: uint64(conf.NumRequests + conf.Offset - 1), - } - } else if conf.Incremental { - log.Info("Benchmark INCREMENTAL") - - attack = Attack{ - kind: incremental, - } - } - - attack.config = conf - - attack.Run() -} - -type Attack struct { - kind kind - balloonVersion uint64 - - config Config - client *client.HTTPClient - reqChan chan uint - senChan chan Task -} - -type Task struct { - kind kind - - event string - key []byte - version, start, end uint64 -} - -func (a *Attack) Run() { - a.CreateFanOut() - a.CreateFanIn() - - for i := a.config.Offset; i < a.config.Offset+a.config.NumRequests; i++ { - a.reqChan <- i - } - -} -func (a *Attack) Shutdown() { - close(a.reqChan) - close(a.senChan) -} - -func (a *Attack) CreateFanIn() { - a.reqChan = make(chan uint, a.config.NumRequests/100) - - for rID := uint(0); rID < a.config.MaxGoRoutines; rID++ { - go func(rID uint) { - for { - id, ok := <-a.reqChan - if !ok { - log.Infof("Closing mux chan #%d", rID) - return - } - switch a.kind { - case add: - a.senChan <- Task{ - kind: a.kind, - event: fmt.Sprintf("event %d", id), - } - case membership: - a.senChan <- Task{ - kind: a.kind, - key: []byte(fmt.Sprintf("event %d", id)), - version: a.balloonVersion, - } - case incremental: - a.senChan <- Task{ - kind: a.kind, - start: uint64(id), - end: uint64(id + a.config.IncrementalDelta), - } - } - a.config.counter++ - } - }(rID) - } - -} - -func (a *Attack) CreateFanOut() { - - cConf := client.DefaultConfig() - cConf.Endpoints = []string{a.config.Endpoint} - cConf.APIKey = a.config.APIKey - cConf.Insecure = a.config.Insecure - a.client = client.NewHTTPClient(*cConf) - if err := a.client.Ping(); err != nil { - panic(err) - } - - a.senChan = make(chan Task, a.config.NumRequests/100) - - for rID := uint(0); rID < a.config.MaxGoRoutines; rID++ { - - go func(rID uint) { - for { - task, ok := <-a.senChan - if !ok { - log.Infof("Closing demux chan #%d", rID) - return - } - - switch task.kind { - case add: - _, _ = a.client.Add(task.event) - case membership: - _, _ = a.client.Membership(task.key, task.version) - case incremental: - _, _ = a.client.Incremental(task.start, task.end) - } - a.config.counter++ - } - }(rID) - } -} - -func setupMetrics(conf Config) { - ticker := time.NewTicker(1 * time.Second) - start := time.Now() - defer ticker.Stop() - - go func() { - for { - <-ticker.C - elapsed := time.Since(start).Seconds() - summaryPerDuration(conf, elapsed) - } - }() - -} - -func summaryPerDuration(conf Config, elapsed float64) { - - log.Infof( - "Throughput: %.0f req/s | Concurrency: %d | Elapsed time: %.3f seconds\n", - conf.counter/elapsed, - conf.MaxGoRoutines, - elapsed, - ) -} - -type axis struct { - x, y []float64 -}