-
Notifications
You must be signed in to change notification settings - Fork 0
/
scanner.go
156 lines (141 loc) · 3.07 KB
/
scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"bufio"
"fmt"
"io"
"os"
)
// scanner provides the interface for the scanners capable of traversing
// the memory porting and disk portion of buffers.
type scanner interface {
next() bool
lastRecord() []byte
nextRecord() []byte
err() error
}
type memoryScanner struct {
index int
buf *buffer
lst []byte
nxt []byte
}
func (s *memoryScanner) next() bool {
s.lst, s.nxt = s.nxt, s.lst
s.index++
if s.index < s.buf.Len() {
n := readInt(s.buf.buf, s.index*32)
if n > cap(s.nxt) {
s.nxt = make([]byte, 4096*(n%4096)+4096)
}
s.nxt = s.nxt[:n]
pn := 16
if n < pn {
pn = n
}
copy(s.nxt[0:pn], s.buf.buf[s.index*32+8:s.index*32+8+pn])
if n > 16 {
p := readInt(s.buf.buf, s.index*32+24)
copy(s.nxt[16:n], s.buf.buf[p:p+n-16])
}
return true
}
return false
}
func (s *memoryScanner) nextRecord() []byte {
return s.nxt
}
func (s *memoryScanner) lastRecord() []byte {
return s.lst
}
func (s *memoryScanner) err() error {
return nil
}
func newMemoryScanner(b *buffer) *memoryScanner {
return &memoryScanner{
index: -1,
buf: b,
}
}
type fileScanner struct {
f *os.File
r *bufio.Reader
e error
lst []byte
nxt []byte
}
// next advances the scanner to the next record. The record is read from the read buffer together
// with bytes from the last record (since it is front compressed). This function is also
// responsible for growing the record buffer if it is too small to hold the next record.
func (s *fileScanner) next() bool {
if s.e != nil {
return false
}
s.lst, s.nxt = s.nxt, s.lst
pn, err := readVarInt(s.r)
if err != nil {
if err != io.EOF {
s.e = fmt.Errorf("error reading prefix length from file: %v", err)
}
if err := s.f.Close(); s.e == nil && err != nil {
s.e = fmt.Errorf("error closing file: %v", err)
}
return false
}
rn, err := readVarInt(s.r)
if err != nil {
s.e = fmt.Errorf("error reading record length from file: %v", err)
return false
}
n := pn + rn
if n > cap(s.nxt) {
s.nxt = make([]byte, 4096*(n/4096)+4096)
}
s.nxt = s.nxt[:n]
copy(s.nxt[0:pn], s.lst[0:pn])
for i := pn; i < n; {
m, err := s.r.Read(s.nxt[i:n])
if err != nil {
s.e = fmt.Errorf("error reading record from file: %v", err)
return false
}
i += m
}
return true
}
func (s *fileScanner) nextRecord() []byte {
return s.nxt
}
func (s *fileScanner) lastRecord() []byte {
return s.lst
}
func (s *fileScanner) err() error {
return s.e
}
func newFileScanner(filename string) *fileScanner {
s := &fileScanner{}
s.f, s.e = os.Open(filename)
if s.e != nil {
return s
}
s.r = bufio.NewReader(s.f)
return s
}
// readVarInt reads a variable length integer from a read buffer. This function will return a
// io.EOF iff the read of the first byte of the varint results in a io.EOF.
func readVarInt(r *bufio.Reader) (int, error) {
n := 0
for i := 0; ; i++ {
b, err := r.ReadByte()
if err != nil {
if i == 0 && err == io.EOF {
return -1, err
}
return -1, fmt.Errorf("error reading varint byte: %v", err)
}
n |= int(b&0x7F) << uint(7*i)
if b&0x80 == 0 {
break
}
}
return n, nil
}