diff --git a/src/dbnode/server/options.go b/src/dbnode/server/options.go index fa2cc58c2a..ba082f20bc 100644 --- a/src/dbnode/server/options.go +++ b/src/dbnode/server/options.go @@ -32,4 +32,5 @@ type StorageOptions struct { TChanNodeServerFn node.NewTChanNodeServerFn BackgroundProcessFns []storage.NewBackgroundProcessFn NamespaceHooks storage.NamespaceHooks + NewTileAggregatorFn storage.NewTileAggregatorFn } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 31be18cfb0..0ea674252d 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -870,6 +870,11 @@ func Run(runOpts RunOptions) { opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks) } + if runOpts.StorageOptions.NewTileAggregatorFn != nil { + aggregator := runOpts.StorageOptions.NewTileAggregatorFn(iopts) + opts = opts.SetTileAggregator(aggregator) + } + // Set bootstrap options - We need to create a topology map provider from the // same topology that will be passed to the cluster so that when we make // bootstrapping decisions they are in sync with the clustered database diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index f5c79e9890..2eda82f7b4 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -177,6 +177,7 @@ type options struct { wideBatchSize int newBackgroundProcessFns []NewBackgroundProcessFn namespaceHooks NamespaceHooks + tileAggregator TileAggregator } // NewOptions creates a new set of storage options with defaults @@ -252,6 +253,7 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { mediatorTickInterval: defaultMediatorTickInterval, wideBatchSize: defaultWideBatchSize, namespaceHooks: &noopNamespaceHooks{}, + tileAggregator: &noopTileAggregator{}, } return o.SetEncodingM3TSZPooled() } @@ -891,6 +893,17 @@ func (o *options) NamespaceHooks() NamespaceHooks { return o.namespaceHooks } +func (o *options) SetTileAggregator(value TileAggregator) Options { + opts := *o + opts.tileAggregator = value + + return &opts +} + +func (o *options) TileAggregator() TileAggregator { + return o.tileAggregator +} + type noOpColdFlush struct{} func (n *noOpColdFlush) ColdFlushNamespace(Namespace) (OnColdFlushNamespace, error) { @@ -902,3 +915,15 @@ type noopNamespaceHooks struct{} func (h *noopNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error { return nil } + +type noopTileAggregator struct{} + +func (a *noopTileAggregator) AggregateTiles( + opts AggregateTilesOptions, + ns Namespace, + shardID uint32, + readers []fs.DataFileSetReader, + writer fs.StreamingWriter, +) (int64, error) { + return 0, nil +} diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 87a3c33efa..386e0beb13 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -4959,6 +4959,34 @@ func (mr *MockOptionsMockRecorder) NamespaceHooks() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NamespaceHooks", reflect.TypeOf((*MockOptions)(nil).NamespaceHooks)) } +// SetTileAggregator mocks base method +func (m *MockOptions) SetTileAggregator(aggregator TileAggregator) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetTileAggregator", aggregator) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetTileAggregator indicates an expected call of SetTileAggregator +func (mr *MockOptionsMockRecorder) SetTileAggregator(aggregator interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTileAggregator", reflect.TypeOf((*MockOptions)(nil).SetTileAggregator), aggregator) +} + +// TileAggregator mocks base method +func (m *MockOptions) TileAggregator() TileAggregator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TileAggregator") + ret0, _ := ret[0].(TileAggregator) + return ret0 +} + +// TileAggregator indicates an expected call of TileAggregator +func (mr *MockOptionsMockRecorder) TileAggregator() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TileAggregator", reflect.TypeOf((*MockOptions)(nil).TileAggregator)) +} + // MockMemoryTracker is a mock of MemoryTracker interface type MockMemoryTracker struct { ctrl *gomock.Controller @@ -5046,6 +5074,44 @@ func (mr *MockMemoryTrackerMockRecorder) WaitForDec() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForDec", reflect.TypeOf((*MockMemoryTracker)(nil).WaitForDec)) } +// MockTileAggregator is a mock of TileAggregator interface +type MockTileAggregator struct { + ctrl *gomock.Controller + recorder *MockTileAggregatorMockRecorder +} + +// MockTileAggregatorMockRecorder is the mock recorder for MockTileAggregator +type MockTileAggregatorMockRecorder struct { + mock *MockTileAggregator +} + +// NewMockTileAggregator creates a new mock instance +func NewMockTileAggregator(ctrl *gomock.Controller) *MockTileAggregator { + mock := &MockTileAggregator{ctrl: ctrl} + mock.recorder = &MockTileAggregatorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTileAggregator) EXPECT() *MockTileAggregatorMockRecorder { + return m.recorder +} + +// AggregateTiles mocks base method +func (m *MockTileAggregator) AggregateTiles(opts AggregateTilesOptions, ns Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", opts, ns, shardID, readers, writer) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockTileAggregatorMockRecorder) AggregateTiles(opts, ns, shardID, readers, writer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), opts, ns, shardID, readers, writer) +} + // MockNamespaceHooks is a mock of NamespaceHooks interface type MockNamespaceHooks struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 3e28e7f31d..b9a521e785 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -1298,6 +1298,12 @@ type Options interface { // NamespaceHooks returns the NamespaceHooks. NamespaceHooks() NamespaceHooks + + // SetTileAggregator sets the TileAggregator. + SetTileAggregator(aggregator TileAggregator) Options + + // TileAggregator returns the TileAggregator. + TileAggregator() TileAggregator } // MemoryTracker tracks memory. @@ -1369,6 +1375,21 @@ type AggregateTilesOptions struct { InsOptions instrument.Options } +// TileAggregator is the interface for AggregateTiles. +type TileAggregator interface { + // AggregateTiles does tile aggregation. + AggregateTiles( + opts AggregateTilesOptions, + ns Namespace, + shardID uint32, + readers []fs.DataFileSetReader, + writer fs.StreamingWriter, + ) (int64, error) +} + +// NewTileAggregatorFn creates a new TileAggregator. +type NewTileAggregatorFn func(iOpts instrument.Options) TileAggregator + // NamespaceHooks allows dynamic plugging into the namespace lifecycle. type NamespaceHooks interface { // OnCreatedNamespace gets invoked after each namespace is created.