-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline_test.go
75 lines (60 loc) · 1.44 KB
/
pipeline_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package pipeline
import (
"bytes"
"fmt"
"io"
"strconv"
"testing"
)
// testData is a test type that implements the required interfaces for the below tests.
type testData struct {
K string `json:"key"`
V int `json:"value"`
}
func (t *testData) Where() bool {
return true
}
func (t *testData) Emit(w io.Writer) error {
_, err := w.Write([]byte(fmt.Sprintf("%s\t%v\n", t.K, t.V)))
return err
}
func (t *testData) Key() string {
return t.K
}
func (t *testData) Value() string {
return strconv.Itoa(t.V)
}
func (t *testData) Sum(s ...Emitter) {
for _, e := range s {
d := e.(*testData)
t.V += d.V
}
}
func TestMapper(t *testing.T) {
input := "{\"key\":\"test\",\"value\":1}\n"
expectedOutput := "test\t1\n"
r := bytes.NewBufferString(input)
w := &bytes.Buffer{}
mapper := NewMap(NewJSONDeserializer(func() Emitter {
return &testData{}
}))
mapper.In(r).Out(w)
if w.String() != expectedOutput {
println(w.String(), expectedOutput)
t.Error("Test failed, output did not match expected")
}
}
func TestReducer(t *testing.T) {
input := "{\"key\":\"test\",\"value\":1}\n{\"key\":\"test\",\"value\":1}\n"
expectedOutput := "test\t2\n"
r := bytes.NewBufferString(input)
w := &bytes.Buffer{}
reducer := NewReduce(NewJSONDeserializer(func() Emitter {
return &testData{}
}))
reducer.In(r).Out(w)
if w.String() != expectedOutput {
println(w.String(), expectedOutput)
t.Error("Test failed, output did not match expected")
}
}