From c3ce03cb3e88bbf225196f907c0eae40a54d207e Mon Sep 17 00:00:00 2001 From: John Kerl Date: Sat, 24 Feb 2024 21:52:22 -0500 Subject: [PATCH] new ILineReader/TLineReader --- pkg/input/line_reader.go | 73 ++++++++++++++++++++++++++-- pkg/input/record_reader_csvlite.go | 4 +- pkg/input/record_reader_dkvp_nidx.go | 4 +- pkg/input/record_reader_json.go | 8 +-- pkg/input/record_reader_pprint.go | 4 +- pkg/input/record_reader_tsv.go | 4 +- pkg/input/record_reader_xtab.go | 11 ++--- 7 files changed, 85 insertions(+), 23 deletions(-) diff --git a/pkg/input/line_reader.go b/pkg/input/line_reader.go index 98347d4d4a..a1cb7645e9 100644 --- a/pkg/input/line_reader.go +++ b/pkg/input/line_reader.go @@ -14,8 +14,63 @@ type ILineReader interface { Text() string } -// NewLineScanner handles read lines which may be delimited by multi-line separators, +type TLineReader struct { + scanner *bufio.Scanner +} + +// NewLineReader handles reading lines which may be delimited by multi-line separators, // e.g. "\xe2\x90\x9e" for USV. +func NewLineReader(handle io.Reader, irs string) *TLineReader { + scanner := bufio.NewScanner(handle) + + if irs == "\n" || irs == "\r\n" { + // Handled by default scanner. + } else { + irsbytes := []byte(irs) + irslen := len(irsbytes) + + // Custom splitter + recordSplitter := func( + data []byte, + atEOF bool, + ) ( + advance int, + token []byte, + err error, + ) { + datalen := len(data) + end := datalen - irslen + for i := 0; i <= end; i++ { + if data[i] == irsbytes[0] { + match := true + for j := 1; j < irslen; j++ { + if data[i+j] != irsbytes[j] { + match = false + break + } + } + if match { + return i + irslen, data[:i], nil + } + } + } + if !atEOF { + return 0, nil, nil + } + // There is one final token to be delivered, which may be the empty string. + // Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this + // but does not trigger an error to be returned from Scan itself. + return 0, data, bufio.ErrFinalToken + } + + scanner.Split(recordSplitter) + } + + return &TLineReader{ + scanner: scanner, + } +} + func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner { scanner := bufio.NewScanner(handle) @@ -66,12 +121,20 @@ func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner { return scanner } +func (r *TLineReader) Scan() bool { + return r.scanner.Scan() +} + +func (r *TLineReader) Text() string { + return r.scanner.Text() +} + // TODO: comment copiously // // Lines are written to the channel with their trailing newline (or whatever // IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". -func channelizedLineScanner( - lineScanner *bufio.Scanner, +func channelizedLineReader( + lineReader ILineReader, linesChannel chan<- *list.List, downstreamDoneChannel <-chan bool, // for mlr head recordsPerBatch int64, @@ -81,10 +144,10 @@ func channelizedLineScanner( lines := list.New() - for lineScanner.Scan() { + for lineReader.Scan() { i++ - lines.PushBack(lineScanner.Text()) + lines.PushBack(lineReader.Text()) // See if downstream processors will be ignoring further data (e.g. mlr // head). If so, stop reading. This makes 'mlr head hugefile' exit diff --git a/pkg/input/record_reader_csvlite.go b/pkg/input/record_reader_csvlite.go index 3664d6ea33..bfc1887196 100644 --- a/pkg/input/record_reader_csvlite.go +++ b/pkg/input/record_reader_csvlite.go @@ -144,9 +144,9 @@ func (reader *RecordReaderCSVLite) processHandle( reader.headerStrings = nil recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) diff --git a/pkg/input/record_reader_dkvp_nidx.go b/pkg/input/record_reader_dkvp_nidx.go index 5cd92f77d0..943fbcb5e7 100644 --- a/pkg/input/record_reader_dkvp_nidx.go +++ b/pkg/input/record_reader_dkvp_nidx.go @@ -101,9 +101,9 @@ func (reader *RecordReaderDKVPNIDX) processHandle( context.UpdateForStartOfFile(filename) recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.getRecordBatch(linesChannel, errorChannel, context) diff --git a/pkg/input/record_reader_json.go b/pkg/input/record_reader_json.go index 27b9b8e2c9..97af200e85 100644 --- a/pkg/input/record_reader_json.go +++ b/pkg/input/record_reader_json.go @@ -203,7 +203,7 @@ func (reader *RecordReaderJSON) processHandle( // JSONCommentEnabledReader implements io.Reader to strip comment lines // off of CSV data. type JSONCommentEnabledReader struct { - lineScanner *bufio.Scanner + lineReader *bufio.Scanner readerOptions *cli.TReaderOptions context *types.Context // Needed for channelized stdout-printing logic readerChannel chan<- *list.List // list of *types.RecordAndContext @@ -220,7 +220,7 @@ func NewJSONCommentEnabledReader( readerChannel chan<- *list.List, // list of *types.RecordAndContext ) *JSONCommentEnabledReader { return &JSONCommentEnabledReader{ - lineScanner: bufio.NewScanner(underlying), + lineReader: bufio.NewScanner(underlying), readerOptions: readerOptions, context: types.NewNilContext(), readerChannel: readerChannel, @@ -237,10 +237,10 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) { // Loop until we can get a non-comment line to pass on, or end of file. for { // EOF - if !bsr.lineScanner.Scan() { + if !bsr.lineReader.Scan() { return 0, io.EOF } - line := bsr.lineScanner.Text() + line := bsr.lineReader.Text() // Non-comment line if !strings.HasPrefix(line, bsr.readerOptions.CommentString) { diff --git a/pkg/input/record_reader_pprint.go b/pkg/input/record_reader_pprint.go index 7495a8d80e..3fa9cd6dab 100644 --- a/pkg/input/record_reader_pprint.go +++ b/pkg/input/record_reader_pprint.go @@ -148,9 +148,9 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle( reader.headerStrings = nil recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) diff --git a/pkg/input/record_reader_tsv.go b/pkg/input/record_reader_tsv.go index a0d77aec42..635dc08407 100644 --- a/pkg/input/record_reader_tsv.go +++ b/pkg/input/record_reader_tsv.go @@ -126,9 +126,9 @@ func (reader *RecordReaderTSV) processHandle( reader.headerStrings = nil recordsPerBatch := reader.recordsPerBatch - lineScanner := NewLineScanner(handle, reader.readerOptions.IRS) + lineReader := NewLineReader(handle, reader.readerOptions.IRS) linesChannel := make(chan *list.List, recordsPerBatch) - go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch) + go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) diff --git a/pkg/input/record_reader_xtab.go b/pkg/input/record_reader_xtab.go index 74d8dac417..e683294cbd 100644 --- a/pkg/input/record_reader_xtab.go +++ b/pkg/input/record_reader_xtab.go @@ -1,7 +1,6 @@ package input import ( - "bufio" "container/list" "fmt" "io" @@ -105,10 +104,10 @@ func (reader *RecordReaderXTAB) processHandle( recordsPerBatch := reader.recordsPerBatch // XTAB uses repeated IFS, rather than IRS, to delimit records - lineScanner := NewLineScanner(handle, reader.readerOptions.IFS) + lineReader := NewLineReader(handle, reader.readerOptions.IFS) stanzasChannel := make(chan *list.List, recordsPerBatch) - go channelizedStanzaScanner(lineScanner, reader.readerOptions, stanzasChannel, downstreamDoneChannel, + go channelizedStanzaScanner(lineReader, reader.readerOptions, stanzasChannel, downstreamDoneChannel, recordsPerBatch) for { @@ -137,7 +136,7 @@ func (reader *RecordReaderXTAB) processHandle( // start or end of file. A single stanza, once parsed, will become a single // record. func channelizedStanzaScanner( - lineScanner *bufio.Scanner, + lineReader ILineReader, readerOptions *cli.TReaderOptions, stanzasChannel chan<- *list.List, // list of list of string downstreamDoneChannel <-chan bool, // for mlr head @@ -150,8 +149,8 @@ func channelizedStanzaScanner( stanzas := list.New() stanza := newStanza() - for lineScanner.Scan() { - line := lineScanner.Text() + for lineReader.Scan() { + line := lineReader.Text() // Check for comments-in-data feature // TODO: function-pointer this away