-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexecutor_test.go
223 lines (185 loc) · 5.61 KB
/
executor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package block_stm
import (
"encoding/binary"
"fmt"
"github.com/stretchr/testify/require"
"math/rand"
"testing"
"time"
)
func TestSimpleDependency(t *testing.T) {
// assume two transactions:
// . tx1 reads from path1 and writes to path2
// . tx2 reads from path2 and writes to path3
p1 := []byte("/foo/1")
p2 := []byte("/foo/2")
p3 := []byte("/foo/3")
mvh := MakeMVHashMap()
// assume these two tasks happen in parallel ...
// ... but second tx doesn't 'see' tx1's write to p2
res2 := mvh.Read(p2, 2)
require.Equal(t, mvReadResultNone, res2.status())
mvh.Write(p3, Version{2, 1}, valueFor(2, 1))
res1 := mvh.Read(p1, 1)
require.Equal(t, mvReadResultNone, res1.status())
mvh.Write(p2, Version{1, 1}, valueFor(1, 1))
lastTxIO := MakeTxnInputOutput(3) // assume there's a tx0 :)
// recordRead read deps of tx2
inp2 := []ReadDescriptor{{p2, ReadKindStorage, Version{2, 1}}}
lastTxIO.recordRead(2, inp2)
valid := validateVersion(2, lastTxIO, mvh)
require.False(t, valid, "tx2 sees dependency on tx1 write") // would cause re-exec and re-validation of tx2
// tx2 now 're-executes' - new incarnation
res2 = mvh.Read(p2, 2)
require.Equal(t, mvReadResultDone, res2.status(), "tx2 now sees 'done' write of tx1 to p2")
mvh.Write(p3, Version{2, 2}, valueFor(2, 2))
inp2 = []ReadDescriptor{{p2, ReadKindMap, Version{2, 2}}}
lastTxIO.recordRead(2, inp2)
valid = validateVersion(2, lastTxIO, mvh)
require.True(t, valid, "tx2 is complete since dep on tx1 is satisfied")
}
type testExecTask struct {
num int
wait time.Duration
}
type testIndependentExecTask struct {
testExecTask
}
func (t testIndependentExecTask) Execute(rw BaseReadWrite) error {
time.Sleep(t.wait)
if _, err := rw.Read([]byte(fmt.Sprintf("test-key-%v", t.num))); err != nil {
return err
}
return rw.Write([]byte(fmt.Sprintf("test-key-%v", t.num)), []byte(fmt.Sprintf("test-val-%v", t.num)))
}
type testSerialExecTask struct {
testExecTask
}
func (t testSerialExecTask) Execute(rw BaseReadWrite) error {
time.Sleep(t.wait)
if _, err := rw.Read([]byte(fmt.Sprintf("test-key-%v", t.num))); err != nil {
return err
}
return rw.Write([]byte(fmt.Sprintf("test-key-%v", t.num+1)), []byte(fmt.Sprintf("test-val-%v", t.num+1)))
}
type testConflictExecTask struct {
testExecTask
}
// all tasks read and write to the same path so execution has to be 100% serial
// this simulates each task reading from and incrementing the same counter
func (t testConflictExecTask) Execute(rw BaseReadWrite) error {
var cnt uint32
time.Sleep(t.wait)
if v, err := rw.Read([]byte("test-key-0")); err != nil {
return err
} else {
cnt = binary.BigEndian.Uint32(v)
}
var b [4]byte
cnt++
binary.BigEndian.PutUint32(b[:], cnt)
return rw.Write([]byte("test-key-0"), b[:])
}
var _ ExecTask = &testSerialExecTask{}
var _ ExecTask = &testConflictExecTask{}
type testBaseReadWrite struct {
}
func (t testBaseReadWrite) Read(k []byte) (v []byte, error error) {
v = make([]byte, 4)
binary.BigEndian.PutUint32(v, 0)
return
}
func (t testBaseReadWrite) Write(k, v []byte) error {
// this is just a NOP for testing purposes ...?
return nil
}
var _ BaseReadWrite = &testBaseReadWrite{}
func validateIndependentTxOutput(txIO *TxnInputOutput) bool {
seq := uint32(0)
for _, v := range txIO.outputs {
checkVal := string(v[0].Val)
if fmt.Sprintf("test-val-%v", seq) != checkVal {
return false
}
seq++
}
return true
}
func TestIndependentParallel(t *testing.T) {
var exec []ExecTask
var totalTaskDuration time.Duration
for i := 0; i < 100; i++ {
t := testIndependentExecTask{
testExecTask: testExecTask{
num: i,
wait: time.Duration(rand.Intn(10)+10) * time.Millisecond,
},
}
exec = append(exec, t)
totalTaskDuration += t.wait
}
testParallelScenario(t, exec, totalTaskDuration, validateIndependentTxOutput)
}
func validateConflictTxOutput(txIO *TxnInputOutput) bool {
seq := uint32(1)
for _, v := range txIO.outputs {
if binary.BigEndian.Uint32(v[0].Val) != seq {
return false
}
seq++
}
return true
}
func TestConflictParallel(t *testing.T) {
var exec []ExecTask
var totalTaskDuration time.Duration
for i := 0; i < 100; i++ {
t := testConflictExecTask{
testExecTask: testExecTask{
num: i,
wait: time.Duration(rand.Intn(10)+10) * time.Millisecond,
},
}
exec = append(exec, t)
totalTaskDuration += t.wait
}
testParallelScenario(t, exec, totalTaskDuration, validateConflictTxOutput)
}
func validateSerialTxOutput(txIO *TxnInputOutput) bool {
seq := uint32(1)
for _, v := range txIO.outputs {
checkVal := string(v[0].Val)
if fmt.Sprintf("test-val-%v", seq) != checkVal {
return false
}
seq++
}
return true
}
func TestSerialParallel(t *testing.T) {
var exec []ExecTask
var totalTaskDuration time.Duration
for i := 0; i < 100; i++ {
t := testSerialExecTask{
testExecTask: testExecTask{
num: i,
wait: time.Duration(rand.Intn(10)+10) * time.Millisecond,
},
}
exec = append(exec, t)
totalTaskDuration += t.wait
}
testParallelScenario(t, exec, totalTaskDuration, validateSerialTxOutput)
}
func testParallelScenario(t *testing.T, exec []ExecTask, totalTaskDuration time.Duration, validateTxIO func(txIO *TxnInputOutput) bool) {
var rw testBaseReadWrite
start := time.Now()
txIO, err := ExecuteParallel(exec, &rw)
execDuration := time.Since(start)
require.NoError(t, err)
// with base parallelism and incomplete validation logic:
// . 100-200 ms tasks: exec duration 1.604971583s, total duration 14.799s
// . 10-20 ms tasks: exec duration 160.18275ms, total duration 1.469s
println(fmt.Sprintf("exec duration %v, total duration %v", execDuration, totalTaskDuration))
require.True(t, validateTxIO(txIO))
}