Skip to content

Commit

Permalink
RecordReader: support regular expressions for text values
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Mar 31, 2022
1 parent dc3bbf9 commit 001c6d3
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 3 deletions.
66 changes: 63 additions & 3 deletions pkg/kgo/record_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -899,10 +900,14 @@ type RecordReader struct {
//
// Topics, keys, and values can be decoded uding "base64" and "hex" formatting
// options. Any size specification is the size of the encoded value actually
// being read:
// being read.
//
// %t{hex}
// %v{base64}
// %T%t{hex} - 4abcd reads four hex characters "abcd"
// %V%v{base64} - 2z9 reads two base64 characters "z9"
//
// As well, these text options can be parsed with regular expressions:
//
// %k{re[\d*]}%v{re[\s+]}
//
func NewRecordReader(reader io.Reader, layout string) (*RecordReader, error) {
r := &RecordReader{r: bufio.NewReader(reader)}
Expand Down Expand Up @@ -1135,6 +1140,7 @@ func (r *RecordReader) parseReadLayout(layout string) error {

case 't', 'k', 'v':
var decodeFn func([]byte) ([]byte, error)
var re *regexp.Regexp
if handledBrace = isOpenBrace; handledBrace {
switch {
case strings.HasPrefix(layout, "base64}"):
Expand All @@ -1143,6 +1149,23 @@ func (r *RecordReader) parseReadLayout(layout string) error {
case strings.HasPrefix(layout, "hex}"):
decodeFn = decodeHex
layout = layout[len("hex}"):]
case strings.HasPrefix(layout, "re"):
restr, rem, err := nomOpenClose(layout[len("re"):])
if err != nil {
return fmt.Errorf("re parse err: %v", err)
}
if len(rem) == 0 || rem[0] != '}' {
return fmt.Errorf("re missing closing } in %q", layout)
}
layout = rem[1:]
if !strings.HasPrefix(restr, "^") {
restr = "^" + restr
}
re, err = regexp.Compile(restr)
if err != nil {
return fmt.Errorf("re parse err: %v", err)
}

default:
return fmt.Errorf("unknown %%%s{ escape", string(escaped))
}
Expand Down Expand Up @@ -1176,7 +1199,12 @@ func (r *RecordReader) parseReadLayout(layout string) error {
}}
bit.set(bit)
if bits.has(bitSize) {
if re != nil {
return errors.New("cannot specify exact size and regular expression")
}
fn.read = readKind{sizefn: func() int { return int(*size) }}
} else if re != nil {
fn.read = readKind{re: re}
}
r.fns = append(r.fns, fn)

Expand Down Expand Up @@ -1383,6 +1411,7 @@ type readKind struct {
sizefn func() int
handoff func(*RecordReader, *Record) error
delim []byte
re *regexp.Regexp
}

func (r *readKind) empty() bool {
Expand Down Expand Up @@ -1427,6 +1456,8 @@ func (r *RecordReader) next(rec *Record) error {
err = r.readSize(fn.read.sizefn())
case fn.read.handoff != nil:
err = fn.read.handoff(r, rec)
case fn.read.re != nil:
err = r.readRe(fn.read.re)
default:
err = r.readDelim(fn.read.delim) // we *always* fall back to delim parsing
}
Expand Down Expand Up @@ -1473,6 +1504,35 @@ func (r *RecordReader) readCondition(fn func(byte) bool) error {
}
}

type reReader struct {
r *RecordReader
peek []byte
err error
}

func (re *reReader) ReadRune() (r rune, size int, err error) {
re.peek, re.err = re.r.r.Peek(len(re.peek) + 1)
if re.err != nil {
return 0, 0, re.err
}
return rune(re.peek[len(re.peek)-1]), 1, nil
}

func (r *RecordReader) readRe(re *regexp.Regexp) error {
reader := reReader{r: r}
loc := re.FindReaderIndex(&reader)
if loc == nil {
return reader.err
}
n := loc[1] // we ensure the regexp begins with ^, so we only need the end
r.buf = append(r.buf, reader.peek[:n]...)
r.r.Discard(n)
if n == len(reader.peek) {
return reader.err
}
return nil
}

func (r *RecordReader) readSize(n int) error {
r.buf = append(r.buf, make([]byte, n)...)
n, err := io.ReadFull(r.r, r.buf)
Expand Down
34 changes: 34 additions & 0 deletions pkg/kgo/record_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,40 @@ func TestRecordReader(t *testing.T) {
}},
},

{
layout: `%v{re[\d{2}]}`,
in: "2345",
exp: []*Record{
StringRecord("23"),
StringRecord("45"),
},
},

{
layout: `%v{re[(\d{2}|asdf)]}`,
in: "23asdf45",
exp: []*Record{
StringRecord("23"),
StringRecord("asdf"),
StringRecord("45"),
},
},

{
layout: `%v{re[(\d{2}|asdf)]}`,
in: "",
exp: []*Record{},
},

{
layout: `%K{3}%v{re[.*?\d]}%k`,
in: "abcdefg[1aaad2bbb",
exp: []*Record{
KeyStringRecord("aaa", "abcdefg[1"),
KeyStringRecord("bbb", "d2"),
},
},

//
} {
t.Run(test.layout, func(t *testing.T) {
Expand Down

0 comments on commit 001c6d3

Please sign in to comment.