diff --git a/pkg/input/constants.go b/pkg/input/constants.go new file mode 100644 index 0000000000..42030c3eb1 --- /dev/null +++ b/pkg/input/constants.go @@ -0,0 +1,3 @@ +package input + +const CSV_BOM = "\xef\xbb\xbf" diff --git a/pkg/input/line_reader.go b/pkg/input/line_reader.go new file mode 100644 index 0000000000..c6b2726098 --- /dev/null +++ b/pkg/input/line_reader.go @@ -0,0 +1,126 @@ +// This file contains the interface for file-format-specific record-readers, as +// well as a collection of utility functions. + +package input + +import ( + "bufio" + "container/list" + "io" +) + +type ILineReader interface { + Scan() bool + Text() string +} + +type TLineReader struct { + scanner *bufio.Scanner +} + +// NewLineReader handles reading lines which may be delimited by multi-line separators, +// e.g. "\xe2\x90\x9e" for USV. +func NewLineReader(handle io.Reader, irs string) *TLineReader { + scanner := bufio.NewScanner(handle) + + if irs == "\n" || irs == "\r\n" { + // Handled by default scanner. + } else { + irsbytes := []byte(irs) + irslen := len(irsbytes) + + // Custom splitter + recordSplitter := func( + data []byte, + atEOF bool, + ) ( + advance int, + token []byte, + err error, + ) { + datalen := len(data) + end := datalen - irslen + for i := 0; i <= end; i++ { + if data[i] == irsbytes[0] { + match := true + for j := 1; j < irslen; j++ { + if data[i+j] != irsbytes[j] { + match = false + break + } + } + if match { + return i + irslen, data[:i], nil + } + } + } + if !atEOF { + return 0, nil, nil + } + // There is one final token to be delivered, which may be the empty string. + // Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this + // but does not trigger an error to be returned from Scan itself. + return 0, data, bufio.ErrFinalToken + } + + scanner.Split(recordSplitter) + } + + return &TLineReader{ + scanner: scanner, + } +} + +func (r *TLineReader) Scan() bool { + return r.scanner.Scan() +} + +func (r *TLineReader) Text() string { + return r.scanner.Text() +} + +// TODO: comment copiously +// +// Lines are written to the channel with their trailing newline (or whatever +// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". +func channelizedLineReader( + lineReader ILineReader, + linesChannel chan<- *list.List, + downstreamDoneChannel <-chan bool, // for mlr head + recordsPerBatch int64, +) { + i := int64(0) + done := false + + lines := list.New() + + for lineReader.Scan() { + i++ + + lines.PushBack(lineReader.Text()) + + // See if downstream processors will be ignoring further data (e.g. mlr + // head). If so, stop reading. This makes 'mlr head hugefile' exit + // quickly, as it should. + if i%recordsPerBatch == 0 { + select { + case _ = <-downstreamDoneChannel: + done = true + break + default: + break + } + if done { + break + } + linesChannel <- lines + lines = list.New() + } + + if done { + break + } + } + linesChannel <- lines + close(linesChannel) // end-of-stream marker +} diff --git a/pkg/input/record_reader.go b/pkg/input/record_reader.go index 096060e629..62a411f222 100644 --- a/pkg/input/record_reader.go +++ b/pkg/input/record_reader.go @@ -4,19 +4,11 @@ package input import ( - "bufio" "container/list" - "io" - "regexp" - "strings" - "github.com/johnkerl/miller/pkg/cli" - "github.com/johnkerl/miller/pkg/lib" "github.com/johnkerl/miller/pkg/types" ) -const CSV_BOM = "\xef\xbb\xbf" - // Since Go is concurrent, the context struct (AWK-like variables such as // FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the // channels along with each record. Hence the initial context, which readers @@ -32,166 +24,3 @@ type IRecordReader interface { downstreamDoneChannel <-chan bool, // for mlr head ) } - -// NewLineScanner handles read lines which may be delimited by multi-line separators, -// e.g. "\xe2\x90\x9e" for USV. -func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner { - scanner := bufio.NewScanner(handle) - - // Handled by default scanner. - if irs == "\n" || irs == "\r\n" { - return scanner - } - - irsbytes := []byte(irs) - irslen := len(irsbytes) - - // Custom splitter - recordSplitter := func( - data []byte, - atEOF bool, - ) ( - advance int, - token []byte, - err error, - ) { - datalen := len(data) - end := datalen - irslen - for i := 0; i <= end; i++ { - if data[i] == irsbytes[0] { - match := true - for j := 1; j < irslen; j++ { - if data[i+j] != irsbytes[j] { - match = false - break - } - } - if match { - return i + irslen, data[:i], nil - } - } - } - if !atEOF { - return 0, nil, nil - } - // There is one final token to be delivered, which may be the empty string. - // Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this - // but does not trigger an error to be returned from Scan itself. - return 0, data, bufio.ErrFinalToken - } - - scanner.Split(recordSplitter) - - return scanner -} - -// TODO: comment copiously -// -// Lines are written to the channel with their trailing newline (or whatever -// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". -func channelizedLineScanner( - lineScanner *bufio.Scanner, - linesChannel chan<- *list.List, - downstreamDoneChannel <-chan bool, // for mlr head - recordsPerBatch int64, -) { - i := int64(0) - done := false - - lines := list.New() - - for lineScanner.Scan() { - i++ - - lines.PushBack(lineScanner.Text()) - - // See if downstream processors will be ignoring further data (e.g. mlr - // head). If so, stop reading. This makes 'mlr head hugefile' exit - // quickly, as it should. - if i%recordsPerBatch == 0 { - select { - case _ = <-downstreamDoneChannel: - done = true - break - default: - break - } - if done { - break - } - linesChannel <- lines - lines = list.New() - } - - if done { - break - } - } - linesChannel <- lines - close(linesChannel) // end-of-stream marker -} - -// IPairSplitter splits a string into left and right, e.g. for IPS. -// This helps us reuse code for splitting by IPS string, or IPS regex. -type iPairSplitter interface { - Split(input string) []string -} - -func newPairSplitter(options *cli.TReaderOptions) iPairSplitter { - if options.IPSRegex == nil { - return &tIPSSplitter{ips: options.IPS} - } else { - return &tIPSRegexSplitter{ipsRegex: options.IPSRegex} - } -} - -type tIPSSplitter struct { - ips string -} - -func (s *tIPSSplitter) Split(input string) []string { - return strings.SplitN(input, s.ips, 2) -} - -type tIPSRegexSplitter struct { - ipsRegex *regexp.Regexp -} - -func (s *tIPSRegexSplitter) Split(input string) []string { - return lib.RegexCompiledSplitString(s.ipsRegex, input, 2) -} - -// IFieldSplitter splits a string into pieces, e.g. for IFS. -// This helps us reuse code for splitting by IFS string, or IFS regex. -type iFieldSplitter interface { - Split(input string) []string -} - -func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter { - if options.IFSRegex == nil { - return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS} - } else { - return &tIFSRegexSplitter{ifsRegex: options.IFSRegex} - } -} - -type tIFSSplitter struct { - ifs string - allowRepeatIFS bool -} - -func (s *tIFSSplitter) Split(input string) []string { - fields := lib.SplitString(input, s.ifs) - if s.allowRepeatIFS { - fields = lib.StripEmpties(fields) // left/right trim - } - return fields -} - -type tIFSRegexSplitter struct { - ifsRegex *regexp.Regexp -} - -func (s *tIFSRegexSplitter) Split(input string) []string { - return lib.RegexCompiledSplitString(s.ifsRegex, input, -1) -} diff --git a/pkg/input/record_reader_csvlite.go b/pkg/input/record_reader_csvlite.go index 3664d6ea33..bfc1887196 100644 --- a/pkg/input/record_reader_csvlite.go +++ b/pkg/input/record_reader_csvlite.go @@ -144,9 +144,9 @@ func (reader *RecordReaderCSVLite) processHandle( reader.headerStrings = nil recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) diff --git a/pkg/input/record_reader_dkvp_nidx.go b/pkg/input/record_reader_dkvp_nidx.go index 5cd92f77d0..943fbcb5e7 100644 --- a/pkg/input/record_reader_dkvp_nidx.go +++ b/pkg/input/record_reader_dkvp_nidx.go @@ -101,9 +101,9 @@ func (reader *RecordReaderDKVPNIDX) processHandle( context.UpdateForStartOfFile(filename) recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.getRecordBatch(linesChannel, errorChannel, context) diff --git a/pkg/input/record_reader_json.go b/pkg/input/record_reader_json.go index 27b9b8e2c9..1607fb0a51 100644 --- a/pkg/input/record_reader_json.go +++ b/pkg/input/record_reader_json.go @@ -1,7 +1,6 @@ package input import ( - "bufio" "container/list" "fmt" "io" @@ -203,7 +202,7 @@ func (reader *RecordReaderJSON) processHandle( // JSONCommentEnabledReader implements io.Reader to strip comment lines // off of CSV data. type JSONCommentEnabledReader struct { - lineScanner *bufio.Scanner + lineReader ILineReader readerOptions *cli.TReaderOptions context *types.Context // Needed for channelized stdout-printing logic readerChannel chan<- *list.List // list of *types.RecordAndContext @@ -220,7 +219,7 @@ func NewJSONCommentEnabledReader( readerChannel chan<- *list.List, // list of *types.RecordAndContext ) *JSONCommentEnabledReader { return &JSONCommentEnabledReader{ - lineScanner: bufio.NewScanner(underlying), + lineReader: NewLineReader(underlying, "\n"), readerOptions: readerOptions, context: types.NewNilContext(), readerChannel: readerChannel, @@ -237,10 +236,10 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) { // Loop until we can get a non-comment line to pass on, or end of file. for { // EOF - if !bsr.lineScanner.Scan() { + if !bsr.lineReader.Scan() { return 0, io.EOF } - line := bsr.lineScanner.Text() + line := bsr.lineReader.Text() // Non-comment line if !strings.HasPrefix(line, bsr.readerOptions.CommentString) { diff --git a/pkg/input/record_reader_pprint.go b/pkg/input/record_reader_pprint.go index 7495a8d80e..3fa9cd6dab 100644 --- a/pkg/input/record_reader_pprint.go +++ b/pkg/input/record_reader_pprint.go @@ -148,9 +148,9 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle( reader.headerStrings = nil recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) diff --git a/pkg/input/record_reader_tsv.go b/pkg/input/record_reader_tsv.go index a0d77aec42..635dc08407 100644 --- a/pkg/input/record_reader_tsv.go +++ b/pkg/input/record_reader_tsv.go @@ -126,9 +126,9 @@ func (reader *RecordReaderTSV) processHandle( reader.headerStrings = nil recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) diff --git a/pkg/input/record_reader_xtab.go b/pkg/input/record_reader_xtab.go index 74d8dac417..e683294cbd 100644 --- a/pkg/input/record_reader_xtab.go +++ b/pkg/input/record_reader_xtab.go @@ -1,7 +1,6 @@ package input import ( - "bufio" "container/list" "fmt" "io" @@ -105,10 +104,10 @@ func (reader *RecordReaderXTAB) processHandle( recordsPerBatch := reader.recordsPerBatch // XTAB uses repeated IFS, rather than IRS, to delimit records - lineScanner := NewLineScanner(handle, reader.readerOptions.IFS) + lineReader := NewLineReader(handle, reader.readerOptions.IFS) stanzasChannel := make(chan *list.List, recordsPerBatch) - go channelizedStanzaScanner(lineScanner, reader.readerOptions, stanzasChannel, downstreamDoneChannel, + go channelizedStanzaScanner(lineReader, reader.readerOptions, stanzasChannel, downstreamDoneChannel, recordsPerBatch) for { @@ -137,7 +136,7 @@ func (reader *RecordReaderXTAB) processHandle( // start or end of file. A single stanza, once parsed, will become a single // record. func channelizedStanzaScanner( - lineScanner *bufio.Scanner, + lineReader ILineReader, readerOptions *cli.TReaderOptions, stanzasChannel chan<- *list.List, // list of list of string downstreamDoneChannel <-chan bool, // for mlr head @@ -150,8 +149,8 @@ func channelizedStanzaScanner( stanzas := list.New() stanza := newStanza() - for lineScanner.Scan() { - line := lineScanner.Text() + for lineReader.Scan() { + line := lineReader.Text() // Check for comments-in-data feature // TODO: function-pointer this away diff --git a/pkg/input/splitters.go b/pkg/input/splitters.go new file mode 100644 index 0000000000..aa3e43b596 --- /dev/null +++ b/pkg/input/splitters.go @@ -0,0 +1,77 @@ +// This file contains the interface for file-format-specific record-readers, as +// well as a collection of utility functions. + +package input + +import ( + "regexp" + "strings" + + "github.com/johnkerl/miller/pkg/cli" + "github.com/johnkerl/miller/pkg/lib" +) + +// IPairSplitter splits a string into left and right, e.g. for IPS. +// This helps us reuse code for splitting by IPS string, or IPS regex. +type iPairSplitter interface { + Split(input string) []string +} + +func newPairSplitter(options *cli.TReaderOptions) iPairSplitter { + if options.IPSRegex == nil { + return &tIPSSplitter{ips: options.IPS} + } else { + return &tIPSRegexSplitter{ipsRegex: options.IPSRegex} + } +} + +type tIPSSplitter struct { + ips string +} + +func (s *tIPSSplitter) Split(input string) []string { + return strings.SplitN(input, s.ips, 2) +} + +type tIPSRegexSplitter struct { + ipsRegex *regexp.Regexp +} + +func (s *tIPSRegexSplitter) Split(input string) []string { + return lib.RegexCompiledSplitString(s.ipsRegex, input, 2) +} + +// IFieldSplitter splits a string into pieces, e.g. for IFS. +// This helps us reuse code for splitting by IFS string, or IFS regex. +type iFieldSplitter interface { + Split(input string) []string +} + +func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter { + if options.IFSRegex == nil { + return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS} + } else { + return &tIFSRegexSplitter{ifsRegex: options.IFSRegex} + } +} + +type tIFSSplitter struct { + ifs string + allowRepeatIFS bool +} + +func (s *tIFSSplitter) Split(input string) []string { + fields := lib.SplitString(input, s.ifs) + if s.allowRepeatIFS { + fields = lib.StripEmpties(fields) // left/right trim + } + return fields +} + +type tIFSRegexSplitter struct { + ifsRegex *regexp.Regexp +} + +func (s *tIFSRegexSplitter) Split(input string) []string { + return lib.RegexCompiledSplitString(s.ifsRegex, input, -1) +}