-
Notifications
You must be signed in to change notification settings - Fork 0
/
reduce.go
105 lines (93 loc) · 2.58 KB
/
reduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package pipeline
import "io"
// Reduce is a reduce process that reads from an io.PipeReader performs a
// reduce operation and writes to an io.PipeWriter
type Reduce struct {
w *io.PipeWriter
r *io.PipeReader
DeserializerFunc DeserializerFunc
}
// NewReduce creates and returns a pointer to a new reduce type using the given DeserializerFunc
func NewReduce(d DeserializerFunc) *Reduce {
pr, pw := io.Pipe()
return &Reduce{
w: pw,
r: pr,
DeserializerFunc: d,
}
}
// Reduce performs the reduce operation
func (m *Reduce) Reduce(w io.Writer) {
// Return a channel from the formatter
ch := deserializeInput(m.DeserializerFunc, m)
var prev Reducer
for r := range ch {
// If nil then continue
if r == nil {
continue
}
// If Where method returns false then continue
if !r.Where() {
continue
}
curr, ok := r.(Reducer)
if !ok {
continue
}
prev = m.reduce(w, prev, curr)
}
if prev != nil {
m.finalize(w, prev)
}
}
// reduce performs the reduce logic utilised in Reduce. It takes the
// previous and current Emitter, checks whether they contain matching keys
// and if so performs a Sum operation. If the keys do not match it calls
// fn on the Emitter. It returns the current Emitter to be used as the
// previous Emitter on the next iteration of the Reduce func.
func (m *Reduce) reduce(w io.Writer, prev Reducer, curr Reducer) Reducer {
// If nil, then first run, set r to previous and return
if prev == nil {
return curr
}
// If keys match then perform sum operation and return
if prev.Key() == curr.Key() {
prev.Sum(curr)
return prev
}
m.finalize(w, prev)
return curr
}
// finalize checks whether the given Summer implements a
// Finalizer interface and if so calls the Finalize method.
func (m *Reduce) finalize(w io.Writer, r Reducer) {
f, ok := r.(Finalizer)
if !ok {
// If keys do not match call fn for previous and set r to previous
r.(Emitter).Emit(w)
return
}
// If the Emitter implements the Finalizer interface
// then call the Finalize method passing the underlying writer
f.Finalize(w)
}
func (m *Reduce) In(r io.Reader) Pipeline {
go func(w *io.PipeWriter, r io.Reader) {
io.Copy(m.w, r)
m.w.Close()
}(m.w, r)
return m
}
func (m *Reduce) Then(p Pipeline) Pipeline {
go func(p Pipeline) {
m.Reduce(p)
p.Close()
}(p)
return p
}
func (m *Reduce) Out(w io.Writer) {
m.Reduce(w)
}
func (m *Reduce) Close() error { return m.w.Close() }
func (m *Reduce) Write(p []byte) (int, error) { return m.w.Write(p) }
func (m *Reduce) Read(p []byte) (int, error) { return m.r.Read(p) }