-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtsv.go
138 lines (87 loc) · 2.44 KB
/
tsv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package pipeline
import (
"bufio"
"bytes"
"fmt"
"io"
"reflect"
)
type TSVDeserializer struct {
constructor func() Emitter
scanner *bufio.Scanner
next func() Emitter
}
func (m *TSVDeserializer) HasNext() bool {
return m.scanner.Scan()
}
func (m *TSVDeserializer) Error() error {
return m.scanner.Err()
}
func (m *TSVDeserializer) Next() Emitter {
return m.next()
}
func NewTSVDeserializer(constructor func() Emitter) DeserializerFunc {
return func(r io.Reader) Deserializer {
// Scanner splits into lines
scanner := bufio.NewScanner(r)
// Get the type returned by the constructor
ty := reflect.ValueOf(constructor()).Elem()
// A slice to store setters
setters := make([]func(cols [][]byte), 0)
var recurseStructFields func(ty reflect.Value, index []int)
recurseStructFields = func(ty reflect.Value, index []int) {
field := ty.FieldByIndex(index)
// Structs should be flattened
if field.Kind() == reflect.Struct {
for i := 0; i < field.NumField(); i++ {
nestedIndex := append(index, i)
recurseStructFields(ty, nestedIndex)
}
return
}
// Slices should be turned into JSON
if field.Kind() == reflect.Slice {
setters = append(setters, func(cols [][]byte) {
primaryIndex := index[0]
column := cols[primaryIndex]
fmt.Println(reflect.SliceOf(field.Type()).Kind(), string(column))
})
return
}
// Values should be flattened
setters = append(setters, func(cols [][]byte) {
primaryIndex := index[0]
column := cols[primaryIndex]
fmt.Println(field.Kind(), string(column))
})
}
for i := 0; i < ty.NumField(); i++ {
index := []int{i}
recurseStructFields(ty, index)
}
// Unmarshal uses reflection to unmarshal into the struct using the correct column.
// Currently this only supports flat structs
unmarshalEmitter := func(cols [][]byte, e Emitter) {
for _, s := range setters {
s(cols)
}
}
// Next gets the next slice of bytes from the scanner and unmarshals
// it into the Emitter provided by the constructor func.
nextFunc := func() Emitter {
// Get next row
byt := scanner.Bytes()
// Split the row into columns
cols := bytes.Split(byt, []byte("\t"))
// Create a new Emitter from the constructor
e := constructor()
unmarshalEmitter(cols, e)
return e
}
return &TSVDeserializer{
constructor: constructor,
scanner: scanner,
next: nextFunc,
}
}
}