diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index b1481c1c34..df35e4dd80 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -95,8 +95,6 @@ var ( type nsIndex struct { state nsIndexState - extendedRetentionPeriod time.Duration - // all the vars below this line are not modified past the ctor // and don't require a lock when being accessed. nowFn clock.NowFn @@ -658,7 +656,7 @@ func (i *nsIndex) writeBatches( blockSize = i.blockSize futureLimit = now.Add(1 * i.bufferFuture) pastLimit = now.Add(-1 * i.bufferPast) - earliestBlockStartToRetain = i.earliestBlockStartToRetainWithLock(now) + earliestBlockStartToRetain = retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, now) batchOptions = batch.Options() forwardIndexDice = i.forwardIndexDice forwardIndexEnabled = forwardIndexDice.enabled @@ -866,7 +864,10 @@ func (i *nsIndex) Bootstrapped() bool { } func (i *nsIndex) Tick(c context.Cancellable, startTime time.Time) (namespaceIndexTickResult, error) { - var result namespaceIndexTickResult + var ( + result = namespaceIndexTickResult{} + earliestBlockStartToRetain = retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, startTime) + ) i.state.Lock() defer func() { @@ -874,8 +875,6 @@ func (i *nsIndex) Tick(c context.Cancellable, startTime time.Time) (namespaceInd i.state.Unlock() }() - earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(startTime) - result.NumBlocks = int64(len(i.state.blocksByTime)) var multiErr xerrors.MultiError @@ -1033,7 +1032,7 @@ func (i *nsIndex) flushableBlocks( flushable := make([]index.Block, 0, len(i.state.blocksByTime)) now := i.nowFn() - earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(now) + earliestBlockStartToRetain := retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, now) currentBlockStart := now.Truncate(i.blockSize) // Check for flushable blocks by iterating through all block starts w/in retention. for blockStart := earliestBlockStartToRetain; blockStart.Before(currentBlockStart); blockStart = blockStart.Add(i.blockSize) { @@ -1973,7 +1972,7 @@ func (i *nsIndex) CleanupExpiredFileSets(t time.Time) error { } // earliest block to retain based on retention period - earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(t) + earliestBlockStartToRetain := retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, t) // now we loop through the blocks we hold, to ensure we don't delete any data for them. for t := range i.state.blocksByTime { @@ -2182,28 +2181,6 @@ func (i *nsIndex) unableToAllocBlockInvariantError(err error) error { return ierr } -func (i *nsIndex) SetExtendedRetentionPeriod(period time.Duration) { - i.state.Lock() - defer i.state.Unlock() - - if period > i.extendedRetentionPeriod { - i.extendedRetentionPeriod = period - } -} - -func (i *nsIndex) effectiveRetentionPeriodWithLock() time.Duration { - period := i.retentionPeriod - if i.extendedRetentionPeriod > period { - period = i.extendedRetentionPeriod - } - - return period -} - -func (i *nsIndex) earliestBlockStartToRetainWithLock(t time.Time) time.Time { - return retention.FlushTimeStartForRetentionPeriod(i.effectiveRetentionPeriodWithLock(), i.blockSize, t) -} - type nsIndexMetrics struct { asyncInsertAttemptTotal tally.Counter asyncInsertAttemptSkip tally.Counter diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index b04bb5b82c..8ba96d87bf 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -378,24 +378,6 @@ func TestNamespaceIndexQueryNoMatchingBlocks(t *testing.T) { require.NoError(t, err) } -func TestNamespaceIndexSetExtendedRetentionPeriod(t *testing.T) { - ctrl := gomock.NewController(xtest.Reporter{T: t}) - defer ctrl.Finish() - - idx := newTestIndex(t, ctrl).index.(*nsIndex) - originalRetention := idx.retentionPeriod - - assert.Equal(t, originalRetention, idx.effectiveRetentionPeriodWithLock()) - - longerRetention := originalRetention + time.Minute - idx.SetExtendedRetentionPeriod(longerRetention) - assert.Equal(t, longerRetention, idx.effectiveRetentionPeriodWithLock()) - - shorterRetention := longerRetention - time.Second - idx.SetExtendedRetentionPeriod(shorterRetention) - assert.Equal(t, longerRetention, idx.effectiveRetentionPeriodWithLock()) -} - func verifyFlushForShards( t *testing.T, ctrl *gomock.Controller, diff --git a/src/dbnode/storage/readonly_index_proxy.go b/src/dbnode/storage/readonly_index_proxy.go deleted file mode 100644 index fec2bc8eea..0000000000 --- a/src/dbnode/storage/readonly_index_proxy.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storage - -import ( - "errors" - "time" - - "github.com/m3db/m3/src/dbnode/persist" - "github.com/m3db/m3/src/dbnode/sharding" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" - "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/dbnode/ts/writes" - "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/ident" - xtime "github.com/m3db/m3/src/x/time" -) - -var errNamespaceIndexReadOnly = errors.New("write operation on read only namespace index") - -type readOnlyIndexProxy struct { - underlying NamespaceIndex -} - -func (r readOnlyIndexProxy) AssignShardSet(shardSet sharding.ShardSet) {} - -func (r readOnlyIndexProxy) BlockStartForWriteTime(writeTime time.Time) xtime.UnixNano { - return r.underlying.BlockStartForWriteTime(writeTime) -} - -func (r readOnlyIndexProxy) BlockForBlockStart(blockStart time.Time) (index.Block, error) { - return r.underlying.BlockForBlockStart(blockStart) -} - -func (r readOnlyIndexProxy) WriteBatch(batch *index.WriteBatch) error { - return errNamespaceIndexReadOnly -} - -func (r readOnlyIndexProxy) WritePending(pending []writes.PendingIndexInsert) error { - return errNamespaceIndexReadOnly -} - -func (r readOnlyIndexProxy) Query( - ctx context.Context, - query index.Query, - opts index.QueryOptions, -) (index.QueryResult, error) { - return r.underlying.Query(ctx, query, opts) -} - -func (r readOnlyIndexProxy) AggregateQuery( - ctx context.Context, - query index.Query, - opts index.AggregationOptions, -) (index.AggregateQueryResult, error) { - return r.underlying.AggregateQuery(ctx, query, opts) -} - -func (r readOnlyIndexProxy) WideQuery( - ctx context.Context, - query index.Query, - collector chan *ident.IDBatch, - opts index.WideQueryOptions, -) error { - return r.underlying.WideQuery(ctx, query, collector, opts) -} - -func (r readOnlyIndexProxy) Bootstrap(bootstrapResults result.IndexResults) error { - return nil -} - -func (r readOnlyIndexProxy) Bootstrapped() bool { - return r.underlying.Bootstrapped() -} - -func (r readOnlyIndexProxy) CleanupExpiredFileSets(t time.Time) error { - return nil -} - -func (r readOnlyIndexProxy) CleanupDuplicateFileSets() error { - return nil -} - -func (r readOnlyIndexProxy) Tick(c context.Cancellable, startTime time.Time) (namespaceIndexTickResult, error) { - return namespaceIndexTickResult{}, nil -} - -func (r readOnlyIndexProxy) WarmFlush(flush persist.IndexFlush, shards []databaseShard) error { - return nil -} - -func (r readOnlyIndexProxy) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) { - return noopOnColdFlushDone, nil -} - -func (r readOnlyIndexProxy) SetExtendedRetentionPeriod(period time.Duration) { - r.underlying.SetExtendedRetentionPeriod(period) -} - -func (r readOnlyIndexProxy) DebugMemorySegments(opts DebugMemorySegmentsOptions) error { - return r.underlying.DebugMemorySegments(opts) -} - -func (r readOnlyIndexProxy) Close() error { - return nil -} - -// NewReadOnlyIndexProxy builds a new NamespaceIndex that proxies only read -// operations, and no-ops on write operations. -func NewReadOnlyIndexProxy(underlying NamespaceIndex) NamespaceIndex { - return readOnlyIndexProxy{underlying: underlying} -} - -func noopOnColdFlushDone() error { - return nil -} diff --git a/src/dbnode/storage/readonly_index_proxy_test.go b/src/dbnode/storage/readonly_index_proxy_test.go deleted file mode 100644 index ea3518370d..0000000000 --- a/src/dbnode/storage/readonly_index_proxy_test.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storage - -import ( - "errors" - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/ident" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" -) - -func TestReadOnlyIndexProxyReject(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - idx := NewMockNamespaceIndex(ctrl) - roIdx := NewReadOnlyIndexProxy(idx) - - assert.Equal(t, errNamespaceIndexReadOnly, roIdx.WriteBatch(nil)) - assert.Equal(t, errNamespaceIndexReadOnly, roIdx.WritePending(nil)) -} - -func TestReadOnlyIndexProxySuppress(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - idx := NewMockNamespaceIndex(ctrl) - roIdx := NewReadOnlyIndexProxy(idx) - - roIdx.AssignShardSet(nil) - - assert.NoError(t, roIdx.Bootstrap(nil)) - - assert.NoError(t, roIdx.CleanupExpiredFileSets(time.Now())) - - assert.NoError(t, roIdx.CleanupDuplicateFileSets()) - - res, err := roIdx.Tick(nil, time.Now()) - assert.Equal(t, namespaceIndexTickResult{}, res) - assert.NoError(t, err) - - assert.NoError(t, roIdx.WarmFlush(nil, nil)) - - _, err = roIdx.ColdFlush(nil) - assert.NoError(t, err) - - assert.NoError(t, roIdx.Close()) -} - -func TestReadOnlyIndexProxyDelegate(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - idx := NewMockNamespaceIndex(ctrl) - roIdx := NewReadOnlyIndexProxy(idx) - - now := time.Now().Truncate(time.Hour) - later := xtime.ToUnixNano(now.Add(time.Hour)) - testErr := errors.New("test error") - - idx.EXPECT().BlockStartForWriteTime(now).Return(later) - assert.Equal(t, later, roIdx.BlockStartForWriteTime(now)) - - block := index.NewMockBlock(ctrl) - idx.EXPECT().BlockForBlockStart(now).Return(block, testErr) - res, err := roIdx.BlockForBlockStart(now) - assert.Equal(t, testErr, err) - assert.Equal(t, block, res) - - ctx := context.NewContext() - query := index.Query{} - queryOpts := index.QueryOptions{} - queryRes := index.QueryResult{} - - idx.EXPECT().Query(ctx, query, queryOpts).Return(queryRes, testErr) - qRes, err := roIdx.Query(ctx, query, queryOpts) - assert.Equal(t, testErr, err) - assert.Equal(t, queryRes, qRes) - - aggOpts := index.AggregationOptions{} - aggRes := index.AggregateQueryResult{} - idx.EXPECT().AggregateQuery(ctx, query, aggOpts).Return(aggRes, testErr) - aRes, err := roIdx.AggregateQuery(ctx, query, aggOpts) - assert.Equal(t, testErr, err) - assert.Equal(t, aggRes, aRes) - - wideOpts := index.WideQueryOptions{} - ch := make(chan *ident.IDBatch) - idx.EXPECT().WideQuery(ctx, query, ch, wideOpts).Return(testErr) - err = roIdx.WideQuery(ctx, query, ch, wideOpts) - assert.Equal(t, testErr, err) - close(ch) - - idx.EXPECT().Bootstrapped().Return(true) - assert.True(t, roIdx.Bootstrapped()) - - idx.EXPECT().SetExtendedRetentionPeriod(time.Minute) - roIdx.SetExtendedRetentionPeriod(time.Minute) - - debugOpts := DebugMemorySegmentsOptions{} - idx.EXPECT().DebugMemorySegments(debugOpts).Return(testErr) - assert.Equal(t, testErr, roIdx.DebugMemorySegments(debugOpts)) -} diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ff27a2feeb..3a3ef7e947 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/storage/types.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -2661,18 +2661,6 @@ func (mr *MockNamespaceIndexMockRecorder) ColdFlush(shards interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlush", reflect.TypeOf((*MockNamespaceIndex)(nil).ColdFlush), shards) } -// SetExtendedRetentionPeriod mocks base method -func (m *MockNamespaceIndex) SetExtendedRetentionPeriod(period time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetExtendedRetentionPeriod", period) -} - -// SetExtendedRetentionPeriod indicates an expected call of SetExtendedRetentionPeriod -func (mr *MockNamespaceIndexMockRecorder) SetExtendedRetentionPeriod(period interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetExtendedRetentionPeriod", reflect.TypeOf((*MockNamespaceIndex)(nil).SetExtendedRetentionPeriod), period) -} - // DebugMemorySegments mocks base method func (m *MockNamespaceIndex) DebugMemorySegments(opts DebugMemorySegmentsOptions) error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index afdbde4492..be916df8f3 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -778,9 +778,6 @@ type NamespaceIndex interface { // cold flushing completes to perform houskeeping. ColdFlush(shards []databaseShard) (OnColdFlushDone, error) - // SetExtendedRetentionPeriod allows to extend index retention beyond the retention of the namespace it belongs to. - SetExtendedRetentionPeriod(period time.Duration) - // DebugMemorySegments allows for debugging memory segments. DebugMemorySegments(opts DebugMemorySegmentsOptions) error