From 62c90dd6010d0ee6f4cf80d865387295b3616c79 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Sat, 24 Feb 2024 21:39:43 -0500 Subject: [PATCH] Split up pkg/input/record_reader.go --- pkg/input/constants.go | 3 + pkg/input/line_reader.go | 113 ++++++++++++++++++++++++ pkg/input/record_reader.go | 171 ------------------------------------- pkg/input/splitters.go | 77 +++++++++++++++++ 4 files changed, 193 insertions(+), 171 deletions(-) create mode 100644 pkg/input/constants.go create mode 100644 pkg/input/line_reader.go create mode 100644 pkg/input/splitters.go diff --git a/pkg/input/constants.go b/pkg/input/constants.go new file mode 100644 index 0000000000..42030c3eb1 --- /dev/null +++ b/pkg/input/constants.go @@ -0,0 +1,3 @@ +package input + +const CSV_BOM = "\xef\xbb\xbf" diff --git a/pkg/input/line_reader.go b/pkg/input/line_reader.go new file mode 100644 index 0000000000..98347d4d4a --- /dev/null +++ b/pkg/input/line_reader.go @@ -0,0 +1,113 @@ +// This file contains the interface for file-format-specific record-readers, as +// well as a collection of utility functions. + +package input + +import ( + "bufio" + "container/list" + "io" +) + +type ILineReader interface { + Scan() bool + Text() string +} + +// NewLineScanner handles read lines which may be delimited by multi-line separators, +// e.g. "\xe2\x90\x9e" for USV. +func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner { + scanner := bufio.NewScanner(handle) + + // Handled by default scanner. + if irs == "\n" || irs == "\r\n" { + return scanner + } + + irsbytes := []byte(irs) + irslen := len(irsbytes) + + // Custom splitter + recordSplitter := func( + data []byte, + atEOF bool, + ) ( + advance int, + token []byte, + err error, + ) { + datalen := len(data) + end := datalen - irslen + for i := 0; i <= end; i++ { + if data[i] == irsbytes[0] { + match := true + for j := 1; j < irslen; j++ { + if data[i+j] != irsbytes[j] { + match = false + break + } + } + if match { + return i + irslen, data[:i], nil + } + } + } + if !atEOF { + return 0, nil, nil + } + // There is one final token to be delivered, which may be the empty string. + // Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this + // but does not trigger an error to be returned from Scan itself. + return 0, data, bufio.ErrFinalToken + } + + scanner.Split(recordSplitter) + + return scanner +} + +// TODO: comment copiously +// +// Lines are written to the channel with their trailing newline (or whatever +// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". +func channelizedLineScanner( + lineScanner *bufio.Scanner, + linesChannel chan<- *list.List, + downstreamDoneChannel <-chan bool, // for mlr head + recordsPerBatch int64, +) { + i := int64(0) + done := false + + lines := list.New() + + for lineScanner.Scan() { + i++ + + lines.PushBack(lineScanner.Text()) + + // See if downstream processors will be ignoring further data (e.g. mlr + // head). If so, stop reading. This makes 'mlr head hugefile' exit + // quickly, as it should. + if i%recordsPerBatch == 0 { + select { + case _ = <-downstreamDoneChannel: + done = true + break + default: + break + } + if done { + break + } + linesChannel <- lines + lines = list.New() + } + + if done { + break + } + } + linesChannel <- lines + close(linesChannel) // end-of-stream marker +} diff --git a/pkg/input/record_reader.go b/pkg/input/record_reader.go index 096060e629..62a411f222 100644 --- a/pkg/input/record_reader.go +++ b/pkg/input/record_reader.go @@ -4,19 +4,11 @@ package input import ( - "bufio" "container/list" - "io" - "regexp" - "strings" - "github.com/johnkerl/miller/pkg/cli" - "github.com/johnkerl/miller/pkg/lib" "github.com/johnkerl/miller/pkg/types" ) -const CSV_BOM = "\xef\xbb\xbf" - // Since Go is concurrent, the context struct (AWK-like variables such as // FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the // channels along with each record. Hence the initial context, which readers @@ -32,166 +24,3 @@ type IRecordReader interface { downstreamDoneChannel <-chan bool, // for mlr head ) } - -// NewLineScanner handles read lines which may be delimited by multi-line separators, -// e.g. "\xe2\x90\x9e" for USV. -func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner { - scanner := bufio.NewScanner(handle) - - // Handled by default scanner. - if irs == "\n" || irs == "\r\n" { - return scanner - } - - irsbytes := []byte(irs) - irslen := len(irsbytes) - - // Custom splitter - recordSplitter := func( - data []byte, - atEOF bool, - ) ( - advance int, - token []byte, - err error, - ) { - datalen := len(data) - end := datalen - irslen - for i := 0; i <= end; i++ { - if data[i] == irsbytes[0] { - match := true - for j := 1; j < irslen; j++ { - if data[i+j] != irsbytes[j] { - match = false - break - } - } - if match { - return i + irslen, data[:i], nil - } - } - } - if !atEOF { - return 0, nil, nil - } - // There is one final token to be delivered, which may be the empty string. - // Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this - // but does not trigger an error to be returned from Scan itself. - return 0, data, bufio.ErrFinalToken - } - - scanner.Split(recordSplitter) - - return scanner -} - -// TODO: comment copiously -// -// Lines are written to the channel with their trailing newline (or whatever -// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". -func channelizedLineScanner( - lineScanner *bufio.Scanner, - linesChannel chan<- *list.List, - downstreamDoneChannel <-chan bool, // for mlr head - recordsPerBatch int64, -) { - i := int64(0) - done := false - - lines := list.New() - - for lineScanner.Scan() { - i++ - - lines.PushBack(lineScanner.Text()) - - // See if downstream processors will be ignoring further data (e.g. mlr - // head). If so, stop reading. This makes 'mlr head hugefile' exit - // quickly, as it should. - if i%recordsPerBatch == 0 { - select { - case _ = <-downstreamDoneChannel: - done = true - break - default: - break - } - if done { - break - } - linesChannel <- lines - lines = list.New() - } - - if done { - break - } - } - linesChannel <- lines - close(linesChannel) // end-of-stream marker -} - -// IPairSplitter splits a string into left and right, e.g. for IPS. -// This helps us reuse code for splitting by IPS string, or IPS regex. -type iPairSplitter interface { - Split(input string) []string -} - -func newPairSplitter(options *cli.TReaderOptions) iPairSplitter { - if options.IPSRegex == nil { - return &tIPSSplitter{ips: options.IPS} - } else { - return &tIPSRegexSplitter{ipsRegex: options.IPSRegex} - } -} - -type tIPSSplitter struct { - ips string -} - -func (s *tIPSSplitter) Split(input string) []string { - return strings.SplitN(input, s.ips, 2) -} - -type tIPSRegexSplitter struct { - ipsRegex *regexp.Regexp -} - -func (s *tIPSRegexSplitter) Split(input string) []string { - return lib.RegexCompiledSplitString(s.ipsRegex, input, 2) -} - -// IFieldSplitter splits a string into pieces, e.g. for IFS. -// This helps us reuse code for splitting by IFS string, or IFS regex. -type iFieldSplitter interface { - Split(input string) []string -} - -func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter { - if options.IFSRegex == nil { - return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS} - } else { - return &tIFSRegexSplitter{ifsRegex: options.IFSRegex} - } -} - -type tIFSSplitter struct { - ifs string - allowRepeatIFS bool -} - -func (s *tIFSSplitter) Split(input string) []string { - fields := lib.SplitString(input, s.ifs) - if s.allowRepeatIFS { - fields = lib.StripEmpties(fields) // left/right trim - } - return fields -} - -type tIFSRegexSplitter struct { - ifsRegex *regexp.Regexp -} - -func (s *tIFSRegexSplitter) Split(input string) []string { - return lib.RegexCompiledSplitString(s.ifsRegex, input, -1) -} diff --git a/pkg/input/splitters.go b/pkg/input/splitters.go new file mode 100644 index 0000000000..aa3e43b596 --- /dev/null +++ b/pkg/input/splitters.go @@ -0,0 +1,77 @@ +// This file contains the interface for file-format-specific record-readers, as +// well as a collection of utility functions. + +package input + +import ( + "regexp" + "strings" + + "github.com/johnkerl/miller/pkg/cli" + "github.com/johnkerl/miller/pkg/lib" +) + +// IPairSplitter splits a string into left and right, e.g. for IPS. +// This helps us reuse code for splitting by IPS string, or IPS regex. +type iPairSplitter interface { + Split(input string) []string +} + +func newPairSplitter(options *cli.TReaderOptions) iPairSplitter { + if options.IPSRegex == nil { + return &tIPSSplitter{ips: options.IPS} + } else { + return &tIPSRegexSplitter{ipsRegex: options.IPSRegex} + } +} + +type tIPSSplitter struct { + ips string +} + +func (s *tIPSSplitter) Split(input string) []string { + return strings.SplitN(input, s.ips, 2) +} + +type tIPSRegexSplitter struct { + ipsRegex *regexp.Regexp +} + +func (s *tIPSRegexSplitter) Split(input string) []string { + return lib.RegexCompiledSplitString(s.ipsRegex, input, 2) +} + +// IFieldSplitter splits a string into pieces, e.g. for IFS. +// This helps us reuse code for splitting by IFS string, or IFS regex. +type iFieldSplitter interface { + Split(input string) []string +} + +func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter { + if options.IFSRegex == nil { + return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS} + } else { + return &tIFSRegexSplitter{ifsRegex: options.IFSRegex} + } +} + +type tIFSSplitter struct { + ifs string + allowRepeatIFS bool +} + +func (s *tIFSSplitter) Split(input string) []string { + fields := lib.SplitString(input, s.ifs) + if s.allowRepeatIFS { + fields = lib.StripEmpties(fields) // left/right trim + } + return fields +} + +type tIFSRegexSplitter struct { + ifsRegex *regexp.Regexp +} + +func (s *tIFSRegexSplitter) Split(input string) []string { + return lib.RegexCompiledSplitString(s.ifsRegex, input, -1) +}