forked from satyrius/gonx
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreducer.go
200 lines (181 loc) · 5.03 KB
/
reducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package gonx
// Reducer interface for Entries channel redure.
//
// Each Reduce method should accept input channel of Entries, do it's job and
// the result should be written to the output channel.
//
// It does not return values because usually it runs in a separate
// goroutine and it is handy to use channel for reduced data retrieval.
type Reducer interface {
Reduce(input chan *Entry, output chan *Entry)
}
// ReadAll implements the Reducer interface for simple input entries redirected to
// the output channel.
type ReadAll struct {
}
// Reduce redirects input Entries channel directly to the output without any
// modifications. It is useful when you want jast to read file fast
// using asynchronous with mapper routines.
func (r *ReadAll) Reduce(input chan *Entry, output chan *Entry) {
for entry := range input {
output <- entry
}
close(output)
}
// Count implements the Reducer interface to count entries
type Count struct {
}
// Reduce simply counts entries and write a sum to the output channel
func (r *Count) Reduce(input chan *Entry, output chan *Entry) {
var count uint64
for {
_, ok := <-input
if !ok {
break
}
count++
}
entry := NewEmptyEntry()
entry.SetUintField("count", count)
output <- entry
close(output)
}
// Sum implements the Reducer interface for summarize Entry values for the given fields
type Sum struct {
Fields []string
}
// Reduce summarizes given Entry fields and return a map with result for each field.
func (r *Sum) Reduce(input chan *Entry, output chan *Entry) {
sum := make(map[string]float64)
for entry := range input {
for _, name := range r.Fields {
val, err := entry.FloatField(name)
if err == nil {
sum[name] += val
}
}
}
entry := NewEmptyEntry()
for name, val := range sum {
entry.SetFloatField(name, val)
}
output <- entry
close(output)
}
// Avg implements the Reducer interface for average entries values calculation
type Avg struct {
Fields []string
}
// Reduce calculates the average value for input channel Entries, using configured Fields
// of the struct. Write result to the output channel as map[string]float64
func (r *Avg) Reduce(input chan *Entry, output chan *Entry) {
avg := make(map[string]float64)
count := 0.0
for entry := range input {
for _, name := range r.Fields {
val, err := entry.FloatField(name)
if err == nil {
avg[name] = (avg[name]*count + val) / (count + 1)
}
}
count++
}
entry := NewEmptyEntry()
for name, val := range avg {
entry.SetFloatField(name, val)
}
output <- entry
close(output)
}
// Chain implements the Reducer interface for chaining other reducers
type Chain struct {
filters []Filter
reducers []Reducer
}
// NewChain creates a new chain of Reducers
func NewChain(reducers ...Reducer) *Chain {
chain := new(Chain)
for _, r := range reducers {
if f, ok := interface{}(r).(Filter); ok {
chain.filters = append(chain.filters, f)
} else {
chain.reducers = append(chain.reducers, r)
}
}
return chain
}
// Reduce applies a chain of reducers to the input channel of entries and merge results
func (r *Chain) Reduce(input chan *Entry, output chan *Entry) {
// Make input and output channel for each reducer
subInput := make([]chan *Entry, len(r.reducers))
subOutput := make([]chan *Entry, len(r.reducers))
for i, reducer := range r.reducers {
subInput[i] = make(chan *Entry, cap(input))
subOutput[i] = make(chan *Entry, cap(output))
go reducer.Reduce(subInput[i], subOutput[i])
}
// Read reducer master input channel
for entry := range input {
for _, f := range r.filters {
entry = f.Filter(entry)
if entry == nil {
break
}
}
// Publish input entry for each sub-reducers to process
if entry != nil {
for _, sub := range subInput {
sub <- entry
}
}
}
for _, ch := range subInput {
close(ch)
}
// Merge all results
entry := NewEmptyEntry()
for _, result := range subOutput {
entry.Merge(<-result)
}
output <- entry
close(output)
}
// GroupBy implements the Reducer interface to apply other reducers and get data grouped by
// given fields.
type GroupBy struct {
Fields []string
reducers []Reducer
}
// NewGroupBy creates a new GroupBy Reducer
func NewGroupBy(fields []string, reducers ...Reducer) *GroupBy {
return &GroupBy{
Fields: fields,
reducers: reducers,
}
}
// Reduce applies related reducers and group data by Fields.
func (r *GroupBy) Reduce(input chan *Entry, output chan *Entry) {
subInput := make(map[string]chan *Entry)
subOutput := make(map[string]chan *Entry)
// Read reducer master input channel and create discinct input chanel
// for each entry key we group by
for entry := range input {
key := entry.FieldsHash(r.Fields)
if _, ok := subInput[key]; !ok {
subInput[key] = make(chan *Entry, cap(input))
subOutput[key] = make(chan *Entry, cap(output)+1)
subOutput[key] <- entry.Partial(r.Fields)
go NewChain(r.reducers...).Reduce(subInput[key], subOutput[key])
}
subInput[key] <- entry
}
for _, ch := range subInput {
close(ch)
}
for _, ch := range subOutput {
entry := <-ch
entry.Merge(<-ch)
output <- entry
}
close(output)
}