Skip to content

Commit

Permalink
[dbnode] Introduce Aggregator type (#2840)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Nov 11, 2020
1 parent 553771e commit 00e3304
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/dbnode/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ type StorageOptions struct {
TChanNodeServerFn node.NewTChanNodeServerFn
BackgroundProcessFns []storage.NewBackgroundProcessFn
NamespaceHooks storage.NamespaceHooks
NewTileAggregatorFn storage.NewTileAggregatorFn
}
5 changes: 5 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,11 @@ func Run(runOpts RunOptions) {
opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks)
}

if runOpts.StorageOptions.NewTileAggregatorFn != nil {
aggregator := runOpts.StorageOptions.NewTileAggregatorFn(iopts)
opts = opts.SetTileAggregator(aggregator)
}

// Set bootstrap options - We need to create a topology map provider from the
// same topology that will be passed to the cluster so that when we make
// bootstrapping decisions they are in sync with the clustered database
Expand Down
25 changes: 25 additions & 0 deletions src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ type options struct {
wideBatchSize int
newBackgroundProcessFns []NewBackgroundProcessFn
namespaceHooks NamespaceHooks
tileAggregator TileAggregator
}

// NewOptions creates a new set of storage options with defaults
Expand Down Expand Up @@ -252,6 +253,7 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options {
mediatorTickInterval: defaultMediatorTickInterval,
wideBatchSize: defaultWideBatchSize,
namespaceHooks: &noopNamespaceHooks{},
tileAggregator: &noopTileAggregator{},
}
return o.SetEncodingM3TSZPooled()
}
Expand Down Expand Up @@ -891,6 +893,17 @@ func (o *options) NamespaceHooks() NamespaceHooks {
return o.namespaceHooks
}

func (o *options) SetTileAggregator(value TileAggregator) Options {
opts := *o
opts.tileAggregator = value

return &opts
}

func (o *options) TileAggregator() TileAggregator {
return o.tileAggregator
}

type noOpColdFlush struct{}

func (n *noOpColdFlush) ColdFlushNamespace(Namespace) (OnColdFlushNamespace, error) {
Expand All @@ -902,3 +915,15 @@ type noopNamespaceHooks struct{}
func (h *noopNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error {
return nil
}

type noopTileAggregator struct{}

func (a *noopTileAggregator) AggregateTiles(
opts AggregateTilesOptions,
ns Namespace,
shardID uint32,
readers []fs.DataFileSetReader,
writer fs.StreamingWriter,
) (int64, error) {
return 0, nil
}
66 changes: 66 additions & 0 deletions src/dbnode/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,12 @@ type Options interface {

// NamespaceHooks returns the NamespaceHooks.
NamespaceHooks() NamespaceHooks

// SetTileAggregator sets the TileAggregator.
SetTileAggregator(aggregator TileAggregator) Options

// TileAggregator returns the TileAggregator.
TileAggregator() TileAggregator
}

// MemoryTracker tracks memory.
Expand Down Expand Up @@ -1369,6 +1375,21 @@ type AggregateTilesOptions struct {
InsOptions instrument.Options
}

// TileAggregator is the interface for AggregateTiles.
type TileAggregator interface {
// AggregateTiles does tile aggregation.
AggregateTiles(
opts AggregateTilesOptions,
ns Namespace,
shardID uint32,
readers []fs.DataFileSetReader,
writer fs.StreamingWriter,
) (int64, error)
}

// NewTileAggregatorFn creates a new TileAggregator.
type NewTileAggregatorFn func(iOpts instrument.Options) TileAggregator

// NamespaceHooks allows dynamic plugging into the namespace lifecycle.
type NamespaceHooks interface {
// OnCreatedNamespace gets invoked after each namespace is created.
Expand Down

0 comments on commit 00e3304

Please sign in to comment.