Skip to content

Commit

Permalink
Merge pull request #8510 from gyuho/txn-stresser
Browse files Browse the repository at this point in the history
etcd-tester: add txn stresser
  • Loading branch information
gyuho authored Jan 9, 2018
2 parents bcc5ea6 + 3810509 commit 52f73c5
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 18 deletions.
3 changes: 3 additions & 0 deletions test
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ function functional_pass {
-peer-ports 12380,22380,32380 \
-limit 1 \
-schedule-cases "0 1 2 3 4 5" \
-stress-qps 1000 \
-stress-key-txn-count 100 \
-stress-key-txn-ops 10 \
-exit-on-failure && echo "'etcd-tester' succeeded"
ETCD_TESTER_EXIT_CODE=$?
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
Expand Down
90 changes: 87 additions & 3 deletions tools/functional-tester/etcd-tester/key_stresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
type keyStresser struct {
Endpoint string

keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int

N int

Expand Down Expand Up @@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error {
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.24
stressEntries[1].weight = 0.24
stressEntries = append(stressEntries, stressEntry{
weight: 0.24,
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)

for i := 0; i < s.N; i++ {
Expand Down Expand Up @@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
}
}

func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
keys := make([]string, keyTxnSuffixRange)
for i := range keys {
keys[i] = fmt.Sprintf("/k%03d", i)
}
return writeTxn(kvc, keys, txnOps)
}

func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
return func(ctx context.Context) (error, int64) {
ks := make(map[string]struct{}, txnOps)
for len(ks) != txnOps {
ks[keys[rand.Intn(64)]] = struct{}{}
}
selected := make([]string, 0, txnOps)
for k := range ks {
selected = append(selected, k)
}
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
txnReq := &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
}

// add nested txns if any
for i := 1; i < txnOps; i++ {
k, v := selected[i], fmt.Sprintf("bar%02d", i)
com, delOp, putOp = getTxnReqs(k, v)
nested := &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
},
},
}
txnReq.Success = append(txnReq.Success, nested)
txnReq.Failure = append(txnReq.Failure, nested)
}

_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
return err, int64(txnOps)
}
}

func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
// if key exists (version > 0)
com = &pb.Compare{
Key: []byte(key),
Target: pb.Compare_VERSION,
Result: pb.Compare_GREATER,
TargetUnion: &pb.Compare_Version{Version: 0},
}
delOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte(key),
},
},
}
putOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte(key),
Value: []byte(val),
},
},
}
return com, delOp, putOp
}

func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.Range(ctx, &pb.RangeRequest{
Expand Down
22 changes: 16 additions & 6 deletions tools/functional-tester/etcd-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func main() {
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
Expand Down Expand Up @@ -120,15 +122,23 @@ func main() {
}

scfg := stressConfig{
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
numLeases: 10,
keysPerLease: 10,
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
keyTxnOps: int(*stressKeyTxnOps),
numLeases: 10,
keysPerLease: 10,

etcdRunnerPath: *etcdRunnerPath,
}
if scfg.keyTxnSuffixRange > 100 {
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
}
if scfg.keyTxnOps > 64 {
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
}

t := &tester{
failures: schedule,
Expand Down
22 changes: 13 additions & 9 deletions tools/functional-tester/etcd-tester/stresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
}

type stressConfig struct {
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int

numLeases int
keysPerLease int
Expand All @@ -142,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
return &keyStresser{
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
N: 100,
rateLimiter: sc.rateLimiter,
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
keyTxnSuffixRange: sc.keyTxnSuffixRange,
keyTxnOps: sc.keyTxnOps,
N: 100,
rateLimiter: sc.rateLimiter,
}
case "v2keys":
return &v2Stresser{
Expand Down

0 comments on commit 52f73c5

Please sign in to comment.