Skip to content

Commit

Permalink
Merge branch 'bugfix/issue-80'
Browse files Browse the repository at this point in the history
* bugfix/issue-80:
  fix: math in olric-load tool
  feat: add map of bootstrapped cluster members
  • Loading branch information
buraksezer committed Feb 17, 2021
2 parents 3740c81 + 53a459c commit b2202d6
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 43 deletions.
21 changes: 16 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,17 +406,28 @@ In order to install `olric-load`:
go get -u github.com/buraksezer/olric/cmd/olric-load
```

The following command calls `Put` command for 100000 keys on `127.0.0.1:3320` (it's default) and uses `msgpack` for serialization.
The following command calls `Put` command for 1M keys on `127.0.0.1:3320` (it's default) and uses `msgpack` for serialization.

```
olric-load -c put -s msgpack -k 100000
olric-load -a 192.168.1.3:3320 -s msgpack -k 1000000 -c put
### STATS FOR COMMAND: PUT ###
Serializer is msgpack
100000 requests completed in 1.209334678s
1000000 requests completed in 6.943316278s
50 parallel clients
93% <= 1 milliseconds
5% <= 2 milliseconds
98.36% <= 0 milliseconds
99.50% <= 1 milliseconds
99.79% <= 2 milliseconds
99.91% <= 3 milliseconds
99.95% <= 4 milliseconds
99.96% <= 5 milliseconds
99.96% <= 6 milliseconds
99.97% <= 7 milliseconds
99.98% <= 10 milliseconds
99.99% <= 15 milliseconds
100.00% <= 96 milliseconds
144023.397460 requests per second
```

In order to get more details about the command, call `olric-load -h`.
Expand Down
78 changes: 42 additions & 36 deletions cmd/olric-load/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package loader
import (
"fmt"
"log"
"sort"
"math"
"strconv"
"strings"
"sync"
Expand All @@ -29,15 +29,15 @@ import (
)

type Loader struct {
mu sync.RWMutex
responses []int
commands []string
keyCount int
numClients int
serializer string
client *client.Client
log *log.Logger
wg sync.WaitGroup
mu sync.RWMutex
responses []time.Duration
commands []string
numRequests int
numClients int
serializer string
client *client.Client
log *log.Logger
wg sync.WaitGroup
}

func New(addrs, timeout, serializer string,
Expand Down Expand Up @@ -71,12 +71,12 @@ func New(addrs, timeout, serializer string,
return nil, err
}
l := &Loader{
responses: []int{},
keyCount: keyCount,
numClients: numClients,
client: c,
serializer: serializer,
log: logger,
responses: []time.Duration{},
numRequests: keyCount,
numClients: numClients,
client: c,
serializer: serializer,
log: logger,
}
return l, nil
}
Expand All @@ -87,30 +87,36 @@ func (l *Loader) stats(cmd string, elapsed time.Duration) {

l.log.Printf("### STATS FOR COMMAND: %s ###", strings.ToUpper(cmd))
l.log.Printf("Serializer is %s", l.serializer)
l.log.Printf("%d requests completed in %v", l.keyCount, elapsed)
l.log.Printf("%d requests completed in %v", l.numRequests, elapsed)
l.log.Printf("%d parallel clients", l.numClients)
l.log.Printf("\n")

var ms int = 1000000
result := make(map[int]int)
for _, t := range l.responses {
result[(t/ms)+1]++
}
var keys []int
for key, _ := range result {
keys = append(keys, key)
}
sort.Ints(keys)
for _, key := range keys {
count := result[key]
percentage := count * 100 / len(l.responses)
if percentage > 0 {
fmt.Printf("%4d%%%2s<=%3d milliseconds\n", percentage, "", key)
var limit time.Duration
var lastper float64
for {
limit += time.Millisecond
var hits, count int
for _, rtime := range l.responses {
if rtime < limit {
hits++
}
count++
}
per := float64(hits) / float64(count)
if math.Floor(per*10000) == math.Floor(lastper*10000) {
continue
}
lastper = per
fmt.Printf("%.2f%% <= %d milliseconds\n", per*100, (limit-time.Millisecond)/time.Millisecond)
if per == 1.0 {
break
}
}
rps := float64(l.numRequests) / (float64(elapsed) / float64(time.Second))
l.log.Printf("\n%f requests per second\n", rps)
}

func (l *Loader) call(cmd string, ch chan int) {
func (l *Loader) worker(cmd string, ch chan int) {
defer l.wg.Done()

dm := l.client.NewDMap("olric-load-test")
Expand Down Expand Up @@ -145,7 +151,7 @@ func (l *Loader) call(cmd string, ch chan int) {

response := time.Since(now)
l.mu.Lock()
l.responses = append(l.responses, int(response))
l.responses = append(l.responses, response)
l.mu.Unlock()
}
}
Expand All @@ -168,11 +174,11 @@ func (l *Loader) Run(cmd string) error {
ch := make(chan int)
for i := 0; i < l.numClients; i++ {
l.wg.Add(1)
go l.call(cmd, ch)
go l.worker(cmd, ch)
}

now := time.Now()
for i := 0; i < l.keyCount; i++ {
for i := 0; i < l.numRequests; i++ {
ch <- i
}
close(ch)
Expand Down
13 changes: 11 additions & 2 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"runtime"

"github.com/buraksezer/olric/internal/discovery"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/stats"
"github.com/vmihailenco/msgpack"
Expand All @@ -38,10 +39,18 @@ func (db *Olric) stats() stats.Stats {
NumGoroutine: runtime.NumGoroutine(),
MemStats: *mem,
},
Partitions: make(map[uint64]stats.Partition),
Backups: make(map[uint64]stats.Partition),
Partitions: make(map[uint64]stats.Partition),
Backups: make(map[uint64]stats.Partition),
ClusterMembers: make(map[uint64]discovery.Member),
}

db.members.mtx.RLock()
for id, member := range db.members.m {
// List of bootstrapped cluster members
s.ClusterMembers[id] = member
}
db.members.mtx.RUnlock()

collect := func(partID uint64, part *partition) stats.Partition {
owners := part.loadOwners()
p := stats.Partition{
Expand Down
1 change: 1 addition & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ type Stats struct {
ClusterCoordinator discovery.Member
Partitions map[uint64]Partition
Backups map[uint64]Partition
ClusterMembers map[uint64]discovery.Member
}
34 changes: 34 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package olric

import (
"context"
"reflect"
"testing"

"github.com/buraksezer/olric/internal/discovery"
)

func TestStatsStandalone(t *testing.T) {
Expand Down Expand Up @@ -51,6 +54,17 @@ func TestStatsStandalone(t *testing.T) {
t.Fatalf("Expected cluster coordinator: %v. Got: %v", db.this, s.ClusterCoordinator)
}

if len(s.ClusterMembers) != 1 {
t.Fatalf("Expected length of ClusterMembers map: 1. Got: %d", len(s.ClusterMembers))
}
m, ok := s.ClusterMembers[s.ClusterCoordinator.ID]
if !ok {
t.Fatalf("Member could not be found in ClusterMembers")
}
if !reflect.DeepEqual(m, s.ClusterCoordinator) {
t.Fatalf("Different member for the same ID")
}

var total int
for partID, part := range s.Partitions {
total += part.Length
Expand Down Expand Up @@ -140,4 +154,24 @@ func TestStatsCluster(t *testing.T) {
t.Fatalf("Expected cluster coordinator: %v. Got: %v", db1.this, s.ClusterCoordinator)
}
})

t.Run("check ClusterMembers", func(t *testing.T) {
s, err := db2.Stats()
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
if len(s.ClusterMembers) != 2 {
t.Fatalf("Expected length of ClusterMembers map: 2. Got: %d", len(s.ClusterMembers))
}

for _, member := range []discovery.Member{db1.this, db2.this} {
m, ok := s.ClusterMembers[member.ID]
if !ok {
t.Fatalf("Member: %s could not be found in ClusterMembers", member)
}
if !reflect.DeepEqual(member, m) {
t.Fatalf("Different member for the same ID: %s", member)
}
}
})
}

0 comments on commit b2202d6

Please sign in to comment.