Skip to content

Commit

Permalink
executor: fix csv parser (#9005) (#10269)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lingyu Song authored and jackysp committed Apr 26, 2019
1 parent 1138bd3 commit b51f531
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 23 deletions.
7 changes: 6 additions & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (s *testExecSuite) TestGetFieldsFromLine(c *C) {
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
// Test mixed.
{
`"123",456,"\t7890",abcd`,
[]string{"123", "456", "\t7890", "abcd"},
},
}

ldInfo := LoadDataInfo{
Expand All @@ -212,7 +217,7 @@ func (s *testExecSuite) TestGetFieldsFromLine(c *C) {
}

_, err := ldInfo.getFieldsFromLine([]byte(`1,a string,100.20`))
c.Assert(err, NotNil)
c.Assert(err, IsNil)
}

func assertEqualStrings(c *C, got []field, expect []string) {
Expand Down
187 changes: 165 additions & 22 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package executor

import (
"bytes"
"context"
"fmt"
"strings"

Expand All @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/net/context"
)

// LoadDataExec represents a load data executor.
Expand Down Expand Up @@ -209,7 +208,6 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
if len(prevData) == 0 && len(curData) == 0 {
return nil, false, nil
}

var line []byte
var isEOF, hasStarting, reachLimit bool
if len(prevData) > 0 && len(curData) == 0 {
Expand All @@ -220,7 +218,6 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
for len(curData) > 0 {
line, curData, hasStarting = e.getLine(prevData, curData)
prevData = nil

// If it doesn't find the terminated symbol and this data isn't the last data,
// the data can't be inserted.
if line == nil && !isEOF {
Expand Down Expand Up @@ -300,28 +297,174 @@ func (e *LoadDataInfo) addRecordLD(row []types.Datum) (int64, error) {
type field struct {
str []byte
maybeNull bool
enclosed bool
}

type fieldWriter struct {
pos int
enclosedChar byte
fieldTermChar byte
term *string
isEnclosed bool
isLineStart bool
isFieldStart bool
ReadBuf *[]byte
OutputBuf []byte
}

func (w *fieldWriter) Init(enclosedChar byte, fieldTermChar byte, readBuf *[]byte, term *string) {
w.isEnclosed = false
w.isLineStart = true
w.isFieldStart = true
w.ReadBuf = readBuf
w.enclosedChar = enclosedChar
w.fieldTermChar = fieldTermChar
w.term = term
}

func (w *fieldWriter) putback() {
w.pos--
}

func (w *fieldWriter) getChar() (bool, byte) {
if w.pos < len(*w.ReadBuf) {
ret := (*w.ReadBuf)[w.pos]
w.pos++
return true, ret
}
return false, 0
}

func (w *fieldWriter) isTerminator() bool {
chkpt, isterm := w.pos, true
for i := 1; i < len(*w.term); i++ {
flag, ch := w.getChar()
if !flag || ch != (*w.term)[i] {
isterm = false
break
}
}
if !isterm {
w.pos = chkpt
return false
}
return true
}

func (w *fieldWriter) outputField(enclosed bool) field {
var fild []byte
start := 0
if enclosed {
start = 1
}
for i := start; i < len(w.OutputBuf); i++ {
fild = append(fild, w.OutputBuf[i])
}
if len(fild) == 0 {
fild = []byte("")
}
w.OutputBuf = w.OutputBuf[0:0]
w.isEnclosed = false
w.isFieldStart = true
return field{fild, false, enclosed}
}

func (w *fieldWriter) GetField() (bool, field) {
// The first return value implies whether fieldWriter read the last character of line.
if w.isLineStart {
_, ch := w.getChar()
if ch == w.enclosedChar {
w.isEnclosed = true
w.isFieldStart, w.isLineStart = false, false
w.OutputBuf = append(w.OutputBuf, ch)
} else {
w.putback()
}
}
for {
flag, ch := w.getChar()
if !flag {
ret := w.outputField(false)
return true, ret
}
if ch == w.enclosedChar && w.isFieldStart {
// If read enclosed char at field start.
w.isEnclosed = true
w.OutputBuf = append(w.OutputBuf, ch)
w.isLineStart, w.isFieldStart = false, false
continue
}
w.isLineStart, w.isFieldStart = false, false
if ch == w.fieldTermChar && !w.isEnclosed {
// If read filed terminate char.
if w.isTerminator() {
ret := w.outputField(false)
return false, ret
}
w.OutputBuf = append(w.OutputBuf, ch)
} else if ch == w.enclosedChar && w.isEnclosed {
// If read enclosed char, look ahead.
flag, ch = w.getChar()
if !flag {
ret := w.outputField(true)
return true, ret
} else if ch == w.enclosedChar {
w.OutputBuf = append(w.OutputBuf, ch)
continue
} else if ch == w.fieldTermChar {
// If the next char is fieldTermChar, look ahead.
if w.isTerminator() {
ret := w.outputField(true)
return false, ret
}
w.OutputBuf = append(w.OutputBuf, ch)
} else {
// If there is no terminator behind enclosedChar, put the char back.
w.OutputBuf = append(w.OutputBuf, w.enclosedChar)
w.putback()
}
} else if ch == '\\' {
// TODO: escape only support '\'
w.OutputBuf = append(w.OutputBuf, ch)
flag, ch = w.getChar()
if flag {
if ch == w.enclosedChar {
w.OutputBuf = append(w.OutputBuf, ch)
} else {
w.putback()
}
}
} else {
w.OutputBuf = append(w.OutputBuf, ch)
}
}
}

// getFieldsFromLine splits line according to fieldsInfo.
func (e *LoadDataInfo) getFieldsFromLine(line []byte) ([]field, error) {
var sep []byte
if e.FieldsInfo.Enclosed != 0 {
if line[0] != e.FieldsInfo.Enclosed || line[len(line)-1] != e.FieldsInfo.Enclosed {
return nil, errors.Errorf("line %s should begin and end with %c", string(line), e.FieldsInfo.Enclosed)
var (
reader fieldWriter
fields []field
)

if len(line) == 0 {
str := []byte("")
fields = append(fields, field{str, false, false})
return fields, nil
}

reader.Init(e.FieldsInfo.Enclosed, e.FieldsInfo.Terminated[0], &line, &e.FieldsInfo.Terminated)
for {
eol, f := reader.GetField()
f = f.escape()
if string(f.str) == "NULL" && !f.enclosed {
f.str = []byte{'N'}
f.maybeNull = true
}
fields = append(fields, f)
if eol {
break
}
line = line[1 : len(line)-1]
sep = make([]byte, 0, len(e.FieldsInfo.Terminated)+2)
sep = append(sep, e.FieldsInfo.Enclosed)
sep = append(sep, e.FieldsInfo.Terminated...)
sep = append(sep, e.FieldsInfo.Enclosed)
} else {
sep = []byte(e.FieldsInfo.Terminated)
}
rawCols := bytes.Split(line, sep)
fields := make([]field, 0, len(rawCols))
for _, v := range rawCols {
f := field{v, false}
fields = append(fields, f.escape())
}
return fields, nil
}
Expand All @@ -341,7 +484,7 @@ func (f *field) escape() field {
f.str[pos] = c
pos++
}
return field{f.str[:pos], f.maybeNull}
return field{f.str[:pos], f.maybeNull, f.enclosed}
}

func (f *field) escapeChar(c byte) byte {
Expand Down
Loading

0 comments on commit b51f531

Please sign in to comment.