-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathsum_concurrent.go
226 lines (197 loc) · 5.06 KB
/
sum_concurrent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Copyright (c) Efficient Go Authors
// Licensed under the Apache License 2.0.
package sum
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"sync/atomic"
"github.com/efficientgo/core/errcapture"
"github.com/efficientgo/core/errors"
)
// ConcurrentSum1 performs sum concurrently. A lot slower than ConcurrentSum3. An example of pessimisation.
// Read more in "Efficient Go"; Example 10-10.
func ConcurrentSum1(fileName string) (ret int64, _ error) {
b, err := ioutil.ReadFile(fileName)
if err != nil {
return 0, err
}
var wg sync.WaitGroup
var last int
for i := 0; i < len(b); i++ {
if b[i] != '\n' {
continue
}
wg.Add(1)
go func(line []byte) { // Creation of goroutine turns to be mem intensive on scale! (on top of time)
defer wg.Done()
num, err := ParseInt(line)
if err != nil {
// TODO(bwplotka): Return err using other channel.
return
}
atomic.AddInt64(&ret, num)
}(b[last:i])
last = i + 1
}
wg.Wait()
return ret, nil
}
// ConcurrentSum2 performs sum concurrently. A lot slower than ConcurrentSum3. An example of pessimisation.
// Read more in "Efficient Go"; Example 10-11.
func ConcurrentSum2(fileName string, workers int) (ret int64, _ error) {
b, err := ioutil.ReadFile(fileName)
if err != nil {
return 0, err
}
var (
wg = sync.WaitGroup{}
workCh = make(chan []byte, workers)
)
wg.Add(workers + 1)
go func() {
var last int
for i := 0; i < len(b); i++ {
if b[i] != '\n' {
continue
}
workCh <- b[last:i]
last = i + 1
}
close(workCh)
wg.Done()
}()
for i := 0; i < workers; i++ {
go func() {
var sum int64
for line := range workCh { // Common mistake: for _, line := range <-workCh
num, err := ParseInt(line)
if err != nil {
// TODO(bwplotka): Return err using other channel.
continue
}
sum += num
}
atomic.AddInt64(&ret, sum)
wg.Done()
}()
}
wg.Wait()
return ret, nil
}
// Over inline budget, but for readability it's better. Consider splitting functions if needed to get it inlinded.
//./sum_concurrent.go:11:6: cannot inline shardedRange: function too complex: cost 95 exceeds budget 80
func shardedRange(routineNumber int, bytesPerWorker int, b []byte) (int, int) {
begin := routineNumber * bytesPerWorker
end := begin + bytesPerWorker
if end+bytesPerWorker > len(b) {
end = len(b)
}
// Find last newline before begin and add 1. If not found (-1), it means we
// are at the start. Otherwise, we start after last newline.
return bytes.LastIndex(b[:begin], []byte("\n")) + 1, end
}
// ConcurrentSum3 uses coordination free sharding to perform more efficient computation.
// Read more in "Efficient Go"; Example 10-12.
func ConcurrentSum3(fileName string, workers int) (ret int64, _ error) {
b, err := ioutil.ReadFile(fileName)
if err != nil {
return 0, err
}
var (
bytesPerWorker = len(b) / workers
resultCh = make(chan int64)
)
for i := 0; i < workers; i++ {
go func(i int) {
// Coordination-free algorithm, which shards buffered file deterministically.
begin, end := shardedRange(i, bytesPerWorker, b)
var sum int64
for last := begin; begin < end; begin++ {
if b[begin] != '\n' {
continue
}
num, err := ParseInt(b[last:begin])
if err != nil {
// TODO(bwplotka): Return err using other channel.
continue
}
sum += num
last = begin + 1
}
resultCh <- sum
}(i)
}
for i := 0; i < workers; i++ {
ret += <-resultCh
}
close(resultCh)
return ret, nil
}
func shardedRangeFromReaderAt(routineNumber int, bytesPerWorker int, size int, f io.ReaderAt) (int, int) {
begin := routineNumber * bytesPerWorker
end := begin + bytesPerWorker
if end+bytesPerWorker > size {
end = size
}
if begin == 0 {
return begin, end
}
const maxNumSize = 10
buf := make([]byte, maxNumSize)
begin -= maxNumSize
if _, err := f.ReadAt(buf, int64(begin)); err != nil {
// TODO(bwplotka): Return err using other channel.
fmt.Println(err)
return 0, 0
}
for i := maxNumSize; i > 0; i-- {
if buf[i-1] == '\n' {
begin += i
break
}
}
return begin, end
}
// ConcurrentSum4 is like ConcurrentSum3, but it reads file in sharded way too.
// Read more in "Efficient Go"; Example 10-13.
func ConcurrentSum4(fileName string, workers int) (ret int64, _ error) {
f, err := os.Open(fileName)
if err != nil {
return 0, err
}
defer errcapture.Do(&err, f.Close, "close file")
s, err := f.Stat()
if err != nil {
return 0, err
}
var (
size = int(s.Size())
bytesPerWorker = size / workers
resultCh = make(chan int64)
)
if bytesPerWorker < 10 {
return 0, errors.New("can't have less bytes per goroutine than 10")
}
for i := 0; i < workers; i++ {
go func(i int) {
begin, end := shardedRangeFromReaderAt(i, bytesPerWorker, size, f)
r := io.NewSectionReader(f, int64(begin), int64(end-begin))
b := make([]byte, 8*1024)
sum, err := Sum6Reader(r, b)
if err != nil {
// TODO(bwplotka): Return err using other channel.
fmt.Println(err)
}
resultCh <- sum
}(i)
}
for i := 0; i < workers; i++ {
ret += <-resultCh
}
close(resultCh)
return ret, nil
}