-
Notifications
You must be signed in to change notification settings - Fork 12
/
collector.go
267 lines (207 loc) · 7.96 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package fuego
import "fmt"
// NOTICE:
// The code in this file was inspired by Java Collectors,
// Vavr and somewhat Scala.
// Many thanks to those great projects!
// TODO: evolve towards a go-style decorator pattern?
// A Collector is a mutable reduction operation, optionally transforming the accumulated result.
//
// Collectors can be combined to express complex operations in a concise manner.
//
// In other words, a collector allows creating custom actions on a Stream. **fuego** comes shipped with a
// number of methods such as `MapToInt`, `Head`, `LastN`, `Filter`, etc, and Collectors also provide a
// few additional methods. But what if you need something else?
// And it is straightforward or readable when combining the existing methods fuego offers?
// Enters `Collector`:
// implement you own requirement functionally! Focus on *what* needs to be done in your streams (and
// delegate the details of the *how* to the implementation of your `Collector`).
//
// Type T: type of input elements to the reduction operation
// Type A: mutable accumulation type of the reduction operation (often hidden as an implementation detail)
// Type R: result type of the reduction operation.
type Collector[T, A, R any] struct {
supplier Supplier[A]
accumulator BiFunction[A, T, A]
// combiner BiFunction[A]/BinaryOperator[A] // this is for joining parallel collectors
finisher Function[A, R]
}
// NewCollector creates a new Collector.
func NewCollector[T, A, R any](supplier Supplier[A], accumulator BiFunction[A, T, A], finisher Function[A, R]) Collector[T, A, R] {
if supplier == nil {
panic(PanicCollectorMissingSupplier)
}
if accumulator == nil {
panic(PanicCollectorMissingAccumulator)
}
if finisher == nil {
panic(PanicCollectorMissingFinisher)
}
return Collector[T, A, R]{
supplier: supplier,
accumulator: accumulator,
finisher: finisher,
}
}
// type MutationCollector func(Function, Collector) Collector
// type Collecting func(MutationCollector) MutationCollector
// GroupingBy groups the elements of the downstream Collector
// by classifying them with the provided classifier function.
//
// Type T: the type of the input elements
// Type K: the type of the keys
// Type A: the intermediate accumulation type of the downstream collector
// Type D: the result type of the downstream reduction
//
// classifier: a classifier function mapping input elements to keys
// downstream: a Collector implementing the downstream reduction
func GroupingBy[T any, K comparable, A, D any](classifier Function[T, K], downstream Collector[T, A, D]) Collector[T, map[K]A, map[K]D] {
supplier := func() map[K]A { return map[K]A{} }
accumulator := func(supply map[K]A, element T) map[K]A {
key := classifier(element)
container, ok := supply[key]
if !ok {
container = downstream.supplier()
}
container = downstream.accumulator(container, element)
supply[key] = container
return supply
}
finisher := func(e map[K]A) map[K]D {
m := map[K]D{}
for k, v := range e {
m[k] = downstream.finisher(v)
}
return m
}
return NewCollector(supplier, accumulator, finisher)
}
// Mapping adapts a Collector with elements of type U to a collector with elements of type T.
func Mapping[T, U, A, R any](mapper Function[T, U], downstream Collector[U, A, R]) Collector[T, A, R] {
supplier := downstream.supplier
accumulator := func(supplier A, entry T) A {
return downstream.accumulator(supplier, mapper(entry))
}
finisher := downstream.finisher
return NewCollector(supplier, accumulator, finisher)
}
// FlatMapping adapts the Entries a Collector accepts to another type by
// applying a flat mapping function which maps input elements to a `Stream`.
func FlatMapping[U, T, A, R any](mapper StreamFunction[T, U], collector Collector[U, A, R]) Collector[T, A, R] {
supplier := collector.supplier
accumulator := func(supplierA A, entry T) A {
container := supplierA
stream := mapper(entry)
stream.ForEach(
func(e U) {
container = collector.accumulator(container, e)
})
return container
}
finisher := collector.finisher
return NewCollector(supplier, accumulator, finisher)
}
// Filtering filters the entries a Collector accepts to a subset that satisfy the given predicate.
func Filtering[T, A, R any](predicate Predicate[T], collector Collector[T, A, R]) Collector[T, A, R] {
supplier := collector.supplier
accumulator := func(supplier A, entry T) A {
if predicate(entry) {
return collector.accumulator(supplier, entry)
}
return supplier
}
finisher := collector.finisher
return NewCollector(supplier, accumulator, finisher)
}
// Reducing returns a collector that performs a reduction of
// its input elements using the provided BiFunction.
func Reducing[T any](f2 BiFunction[T, T, T]) Collector[T, Optional[T], T] {
supplier := func() Optional[T] {
return OptionalEmpty[T]()
}
accumulator := func(supplierA Optional[T], entry T) Optional[T] {
result := entry
supplierA.IfPresent(func(val T) { result = f2(val, entry) })
return OptionalOf(result)
}
finisher := func(e Optional[T]) T {
// alternative:
// return e.OrElse(*new(T))
return e.Get()
}
return NewCollector(supplier, accumulator, finisher)
}
// ToSlice returns a collector that accumulates the input entries into a Go slice.
// Type T: type of the elements accumulated in the slice.
func ToSlice[T any]() Collector[T, []T, []T] {
supplier := func() []T { // TODO: use chan A instead with a finisher that converts to []A?
return []T{}
}
accumulator := func(supplier []T, element T) []T {
return append(supplier, element)
}
finisher := IdentityFinisher[[]T]
return NewCollector(supplier, accumulator, finisher)
}
// ToMap returns a collector that accumulates the input entries into a Go map.
// Type T: type from which the elements are accumulated in the map.
// Type K: type of the keys derived from T.
// Type V: type of the values derived from T.
func ToMap[T any, K comparable, V any](keyMapper Function[T, K], valueMapper Function[T, V]) Collector[T, map[K]V, map[K]V] {
supplier := func() map[K]V { // TODO: use chan instead with a finisher that converts to map?
return map[K]V{}
}
accumulator := func(supplier map[K]V, element T) map[K]V {
key := keyMapper(element)
value := valueMapper(element)
if _, ok := supplier[key]; !ok {
supplier[key] = value
return supplier
}
panic(fmt.Sprintf("%s: '%v'", PanicDuplicateKey, key))
}
finisher := IdentityFinisher[map[K]V]
return NewCollector(supplier, accumulator, finisher)
}
// ToMapWithMerge returns a collector that accumulates the input entries into a Go map.
// Key collision strategy is managed by mergeFn.
// Type T: type from which the elements are accumulated in the map.
// Type K: type of the keys derived from T.
// Type V: type of the values derived from T.
func ToMapWithMerge[T any, K comparable, V any](keyMapper Function[T, K], valueMapper Function[T, V], mergeFn BiFunction[V, V, V]) Collector[T, map[K]V, map[K]V] {
supplier := func() map[K]V { // TODO: use chan instead with a finisher that converts to map?
return map[K]V{}
}
accumulator := func(supplier map[K]V, element T) map[K]V {
key := keyMapper(element)
value := valueMapper(element)
if _, ok := supplier[key]; !ok {
supplier[key] = value
return supplier
}
supplier[key] = mergeFn(supplier[key], value)
return supplier
}
finisher := IdentityFinisher[map[K]V]
return NewCollector(supplier, accumulator, finisher)
}
// IdentityFinisher is a basic finisher that returns the
// original value passed to it, unmodified.
func IdentityFinisher[T any](t T) T {
return t
}
// Collect reduces and optionally mutates the stream with the supplied Collector.
//
// This is a continuous terminal operation and hence expects
// the producer to close the stream in order to complete.
func Collect[T, A, R any](s Stream[T], c Collector[T, A, R]) R {
if s.stream == nil {
panic(PanicMissingChannel)
}
result := c.supplier()
for e := range s.stream {
result = c.accumulator(result, e)
}
finishedResult := c.finisher(result)
return finishedResult
}