Skip to content

Commit

Permalink
[large-tiles] Cross block reader changes plus Cross block iterator (#…
Browse files Browse the repository at this point in the history
…2612)

* [dbnode] Add OrderedByIndex option for DataFileSetReader.Open (#2465)

* [dbnode] Cross-block series reader (#2481)

* [dbnode] AggregateTiles RPC - minimal E2E flow (#2466)

* [large-tiles] Cross block reader changes plus Cross block iterator

* Fix TestReadAggregateWrite

* Fix TestShardAggregateTiles

* Dummy

* Fix TestReadAggregateWrite

* typo

* Convert spaces to tabs in rpc.thrift

* Change capitalization of source/target namespace

* Fix copyright year in large_tiles_test.go

* Remove a noop

* PR feedback

* PR 2617 feedback

* PR feedback

* More PR feedback

* Fix unit test

* Reuse read objects during aggregation

* Renamed StreamingMode to StreamingEnabled

* PR feedback

* Indentation

* Address PR feedback

* Skip flaky TestReadAggregateWrite

Co-authored-by: arnikola <[email protected]>
  • Loading branch information
linasm and arnikola authored Sep 15, 2020
1 parent 8bcf5ac commit 3aa2f51
Show file tree
Hide file tree
Showing 10 changed files with 584 additions and 245 deletions.
6 changes: 5 additions & 1 deletion src/dbnode/integration/large_tiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package integration

import (
"github.com/stretchr/testify/assert"
"testing"
"time"

Expand All @@ -35,12 +34,17 @@ import (
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/zap"
)

func TestReadAggregateWrite(t *testing.T) {
//FIXME
t.Skip("Appears to be flaky, reenable after https://github.com/m3db/m3/pull/2599 is merged")

var (
blockSize = 2 * time.Hour
indexBlockSize = 2 * blockSize
Expand Down
94 changes: 94 additions & 0 deletions src/dbnode/persist/fs/cross_block_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fs

import (
"bytes"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/ts"
xtime "github.com/m3db/m3/src/x/time"
)

type crossBlockIterator struct {
idx int
exhausted bool
current encoding.ReaderIterator
byteReader *bytes.Reader
records []BlockRecord
}

// NewCrossBlockIterator creates a new CrossBlockIterator.
func NewCrossBlockIterator(pool encoding.ReaderIteratorPool) CrossBlockIterator {
c := &crossBlockIterator{current: pool.Get(), byteReader: bytes.NewReader(nil)}
c.Reset(nil)
return c
}

func (c *crossBlockIterator) Next() bool {
if c.exhausted {
return false
}

if c.idx >= 0 && c.current.Next() {
return true
}

// NB: clear previous.
if c.idx >= 0 {
if c.current.Err() != nil {
c.exhausted = true
return false
}
}

c.idx++
if c.idx >= len(c.records) {
c.exhausted = true
return false
}

c.byteReader.Reset(c.records[c.idx].Data)
c.current.Reset(c.byteReader, nil)

// NB: rerun using the next record.
return c.Next()
}

func (c *crossBlockIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
return c.current.Current()
}

func (c *crossBlockIterator) Reset(records []BlockRecord) {
c.idx = -1
c.records = records
c.exhausted = false
c.byteReader.Reset(nil)
}

func (c *crossBlockIterator) Close() {
c.Reset(nil)
c.current.Close()
}

func (c *crossBlockIterator) Err() error {
return c.current.Err()
}
177 changes: 177 additions & 0 deletions src/dbnode/persist/fs/cross_block_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fs

import (
"fmt"
"io"
"io/ioutil"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/ts"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCrossBlockIterator(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

reader := encoding.NewMockReaderIterator(ctrl)

iterPool := encoding.NewMockReaderIteratorPool(ctrl)
iterPool.EXPECT().Get().Return(reader)

iter := NewCrossBlockIterator(iterPool)
assert.False(t, iter.Next())

count := 3
iterCount := 5
startTime := time.Now().Truncate(time.Hour)
start := startTime
records := make([]BlockRecord, 0, count)
for i := 0; i < count; i++ {
byteString := fmt.Sprint(i)
records = append(records, BlockRecord{
Data: []byte(byteString),
})

reader.EXPECT().Reset(gomock.Any(), nil).Do(
func(r io.Reader, _ namespace.SchemaDescr) {
b, err := ioutil.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, byteString, string(b))
})

for j := 0; j < iterCount; j++ {
reader.EXPECT().Next().Return(true)
reader.EXPECT().Current().Return(ts.Datapoint{
Value: float64(j),
Timestamp: start,
}, xtime.Second, nil)
start = start.Add(time.Minute)
}

reader.EXPECT().Next().Return(false)
reader.EXPECT().Err().Return(nil)
}

iter.Reset(records)
i := 0
for iter.Next() {
dp, _, _ := iter.Current()
// NB: iterator values should go [0,1,...,iterCount] for each block record.
assert.Equal(t, float64(i%iterCount), dp.Value)
// NB: time should be constantly increasing per value.
assert.Equal(t, startTime.Add(time.Minute*time.Duration(i)), dp.Timestamp)
i++
}

assert.Equal(t, count*iterCount, i)

reader.EXPECT().Err().Return(errExpected)
assert.Equal(t, errExpected, iter.Err())
reader.EXPECT().Close()
iter.Close()
}

func TestFailingCrossBlockIterator(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

reader := encoding.NewMockReaderIterator(ctrl)

iterPool := encoding.NewMockReaderIteratorPool(ctrl)
iterPool.EXPECT().Get().Return(reader)

iter := NewCrossBlockIterator(iterPool)
assert.False(t, iter.Next())

count := 4
iterCount := 5
remaining := 12
startTime := time.Now().Truncate(time.Hour)
start := startTime
records := make([]BlockRecord, 0, count)
for i := 0; i < count; i++ {
byteString := fmt.Sprint(i)
data := []byte(byteString)

if remaining == 0 {
records = append(records, BlockRecord{
Data: data,
})
continue
}

records = append(records, BlockRecord{
Data: data,
})

reader.EXPECT().Reset(gomock.Any(), nil).Do(
func(r io.Reader, _ namespace.SchemaDescr) {
b, err := ioutil.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, byteString, string(b))
})

for j := 0; remaining > 0 && j < iterCount; j++ {
reader.EXPECT().Next().Return(true)
reader.EXPECT().Current().Return(ts.Datapoint{
Value: float64(j),
Timestamp: start,
}, xtime.Second, nil)
start = start.Add(time.Minute)
remaining--
}

reader.EXPECT().Next().Return(false)
if remaining == 0 {
reader.EXPECT().Err().Return(errExpected).Times(2)
} else {
reader.EXPECT().Err().Return(nil)
}
}

iter.Reset(records)
i := 0
for iter.Next() {
dp, _, _ := iter.Current()
// NB: iterator values should go [0,1,...,iterCount] for each block record.
assert.Equal(t, float64(i%iterCount), dp.Value)
// NB: time should be constantly increasing per value.
assert.Equal(t, startTime.Add(time.Minute*time.Duration(i)), dp.Timestamp)
i++
}

assert.Equal(t, 12, i)

assert.Equal(t, errExpected, iter.Err())
reader.EXPECT().Close()
iter.Close()
}
Loading

0 comments on commit 3aa2f51

Please sign in to comment.