Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pull/374-review
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
Krasi Georgiev committed Jan 17, 2019
2 parents ad9e3ed + 3929359 commit 441ea45
Show file tree
Hide file tree
Showing 5 changed files with 507 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.

## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return res, nil
}

// Compact any blocks that have >5% tombstones.
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ func TestNoEmptyBlocks(t *testing.T) {
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) // ?????????????
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction-3, 0) // ?????????????
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
Expand Down
245 changes: 225 additions & 20 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,28 +832,13 @@ func (r *Reader) next() (err error) {
}
r.rec = append(r.rec, buf[:length]...)

switch r.curRecTyp {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record")
}
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record")
}
case recLast:
if i == 0 {
return errors.New("unexpected last record")
}
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
return nil
default:
return errors.Errorf("unexpected record type %d", r.curRecTyp)
}

// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
Expand Down Expand Up @@ -904,6 +889,226 @@ func (r *Reader) Offset() int64 {
return r.total
}

// NewLiveReader returns a new live reader.
func NewLiveReader(r io.Reader) *LiveReader {
return &LiveReader{rdr: r}
}

// Reader reads WAL records from an io.Reader. It buffers partial record data for
// the next read.
type LiveReader struct {
rdr io.Reader
err error
rec []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
writeIndex int // Index in buf to start at for next write.
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
}

func (r *LiveReader) Err() error {
return r.err
}

func (r *LiveReader) TotalRead() int64 {
return r.total
}

func (r *LiveReader) fillBuffer() error {
n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)])
r.writeIndex += n
return err
}

// Shift the buffer up to the read index.
func (r *LiveReader) shiftBuffer() {
copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex])
r.readIndex = 0
r.writeIndex = copied
}

// Next returns true if r.rec will contain a full record.
// False does not indicate that there will never be more data to
// read for the current io.Reader.
func (r *LiveReader) Next() bool {
for {
if r.buildRecord() {
return true
}
if r.err != nil && r.err != io.EOF {
return false
}
if r.readIndex == pageSize {
r.shiftBuffer()
}
if r.writeIndex != pageSize {
if err := r.fillBuffer(); err != nil {
// We expect to get EOF, since we're reading the segment file as it's being written.
if err != io.EOF {
r.err = err
}
return false
}
}
}
}

// Record returns the current record.
// The returned byte slice is only valid until the next call to Next.
func (r *LiveReader) Record() []byte {
return r.rec
}

// Rebuild a full record from potentially partial records. Returns false
// if there was an error or if we weren't able to read a record for any reason.
// Returns true if we read a full record. Any record data is appeneded to
// LiveReader.rec
func (r *LiveReader) buildRecord() bool {
for {
// Check that we have data in the internal buffer to read.
if r.writeIndex <= r.readIndex {
return false
}

// Attempt to read a record, partial or otherwise.
temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total)
r.readIndex += n
r.total += int64(n)
if err != nil {
r.err = err
return false
}

if temp == nil {
return false
}

rt := recType(r.hdr[0])

if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
r.rec = append(r.rec, temp...)

if err := validateRecord(rt, r.index); err != nil {
r.err = err
r.index = 0
return false
}
if rt == recLast || rt == recFull {
r.index = 0
return true
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
r.index++
}
}

// Returns an error if the recType and i indicate an invalid record sequence.
// As an example, if i is > 0 because we've read some amount of a partial record
// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull
// instead of a recLast or recMiddle we would have an invalid record.
func validateRecord(typ recType, i int) error {
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record, dropping buffer")
}
return nil
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record, dropping buffer")
}
return nil
case recLast:
if i == 0 {
return errors.New("unexpected last record, dropping buffer")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
}

// Read a sub-record (see recType) from the buffer. It could potentially
// be a full record (recFull) if the record fits within the bounds of a single page.
// Returns a byte slice of the record data read, the number of bytes read, and an error
// if there's a non-zero byte in a page term record or the record checksum fails.
// TODO(callum) the EOF errors we're returning from this function should theoretically
// never happen, add a metric for them.
func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) {
readIndex := 0
header[0] = buf[0]
readIndex++
total++

// The rest of this function is mostly from Reader.Next().
typ := recType(header[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (total % pageSize)
if k == pageSize {
return nil, 1, nil // Initial 0 byte was last page byte.
}

if k <= int64(len(buf)-readIndex) {
for _, v := range buf[readIndex : int64(readIndex)+k] {
readIndex++
if v != 0 {
return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes")
}
}
return nil, readIndex, nil
}
// Not enough bytes to read the rest of the page term rec.
// This theoretically should never happen, since we're only shifting the
// internal buffer of the live reader when we read to the end of page.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}

if readIndex+recordHeaderSize-1 > len(buf) {
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}

copy(header[1:], buf[readIndex:readIndex+len(header[1:])])
readIndex += recordHeaderSize - 1
total += int64(recordHeaderSize - 1)
var (
length = binary.BigEndian.Uint16(header[1:])
crc = binary.BigEndian.Uint32(header[3:])
)
readTo := int(length) + readIndex
if readTo > len(buf) {
if (readTo - readIndex) > pageSize {
return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length))
}
// Not enough data to read all of the record data.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
recData := buf[readIndex:readTo]
readIndex += int(length)
total += int64(length)

// TODO(callum) what should we do here, throw out the record? We should add a metric at least.
if c := crc32.Checksum(recData, castagnoliTable); c != crc {
return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return recData, readIndex, nil
}

func min(i, j int) int {
if i < j {
return i
Expand Down
Loading

0 comments on commit 441ea45

Please sign in to comment.