Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[db] Add an option to use db writes in index mode #1596

Merged
merged 10 commits into from
May 2, 2019
8 changes: 8 additions & 0 deletions scripts/development/m3_stack/m3dbnode.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
db:
transforms:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops will revert this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

truncateBy: block
truncateDuration: 1h

index:
forwardIndexThreshold: 0.1
forwardIndexProbability: 0.1

logging:
level: info

Expand Down
44 changes: 43 additions & 1 deletion src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/environment"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/x/config/hostid"
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
Expand Down Expand Up @@ -70,6 +71,9 @@ type DBConfiguration struct {
// Index configuration.
Index IndexConfiguration `yaml:"index"`

// Transforms configuration.
Transforms TransformConfiguration `yaml:"transforms"`

// Logging configuration.
Logging xlog.Configuration `yaml:"logging"`

Expand Down Expand Up @@ -165,6 +169,10 @@ func (c *DBConfiguration) InitDefaultsAndValidate() error {
return err
}

if err := c.Transforms.Validate(); err != nil {
return err
}

return nil
}

Expand All @@ -173,8 +181,42 @@ type IndexConfiguration struct {
// MaxQueryIDsConcurrency controls the maximum number of outstanding QueryID
// requests that can be serviced concurrently. Limiting the concurrency is
// important to prevent index queries from overloading the database entirely
// as they are very CPU-intensive (regex and FST matching.)
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// ForwardIndexProbability determines the likelihood that an incoming write is
// written to the next block, when arriving close to the block boundary.
//
// NB: this is an optimization which lessens pressure on the index around
// block boundaries by eagerly writing the series to the next block
// preemptively.
ForwardIndexProbability float64 `yaml:"forwardIndexProbability" validate:"min=0.0,max=1.0"`

// ForwardIndexThreshold determines the threshold for forward writes, as a
// fraction of the given namespace's bufferFuture.
//
// NB: this is an optimization which lessens pressure on the index around
// block boundaries by eagerly writing the series to the next block
// preemptively.
ForwardIndexThreshold float64 `yaml:"forwardIndexThreshold" validate:"min=0.0,max=1.0"`
}

// TransformConfiguration contains configuration options that can transform
// incoming writes.
type TransformConfiguration struct {
// TruncateBy determines what type of truncatation is applied to incoming
// writes.
TruncateBy series.TruncateType `yaml:"truncateBy"`
// ForcedValue determines what to set all incoming write values to.
ForcedValue *float64 `yaml:"forceValue"`
}

func (c *TransformConfiguration) Validate() error {
if c == nil {
return nil
}

return c.TruncateBy.Validate()
}

// TickConfiguration is the tick configuration for background processing of
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
forwardIndexProbability: 0
forwardIndexThreshold: 0
transforms:
truncateBy: 0
forceValue: null
logging:
file: /var/log/m3dbnode.log
level: info
Expand Down
16 changes: 14 additions & 2 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func Run(runOpts RunOptions) {
CacheRegexp: plCacheConfig.CacheRegexpOrDefault(),
CacheTerms: plCacheConfig.CacheTermsOrDefault(),
})
opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
runtimeOpts = runtimeOpts.
Expand Down Expand Up @@ -1281,6 +1280,17 @@ func withEncodingAndPoolingOptions(
aggregateQueryResultsPool := index.NewAggregateResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-aggregate-results-pool")))

// Set value transformation options.
opts = opts.SetTruncateType(cfg.Transforms.TruncateBy)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe move this above the // Set index options since this transforms values? i.e. maybe just comment with // Set value transformations?

forcedValue := cfg.Transforms.ForcedValue
if forcedValue != nil {
opts = opts.SetWriteTransformOptions(series.WriteTransformOptions{
ForceValueEnabled: true,
ForceValue: *forcedValue,
})
}

// Set index options.
indexOpts := opts.IndexOptions().
SetInstrumentOptions(iopts).
SetMemSegmentOptions(
Expand All @@ -1297,7 +1307,9 @@ func withEncodingAndPoolingOptions(
SetIdentifierPool(identifierPool).
SetCheckedBytesPool(bytesPool).
SetQueryResultsPool(queryResultsPool).
SetAggregateResultsPool(aggregateQueryResultsPool)
SetAggregateResultsPool(aggregateQueryResultsPool).
SetForwardIndexProbability(cfg.Index.ForwardIndexProbability).
SetForwardIndexThreshold(cfg.Index.ForwardIndexThreshold)

queryResultsPool.Init(func() index.QueryResults {
// NB(r): Need to initialize after setting the index opts so
Expand Down
56 changes: 56 additions & 0 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions src/dbnode/storage/index/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func init() {

// nolint: maligned
type opts struct {
forwardIndexThreshold float64
forwardIndexProbability float64
insertMode InsertMode
clockOpts clock.Options
instrumentOpts instrument.Options
Expand Down Expand Up @@ -379,3 +381,23 @@ func (o *opts) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Opt
func (o *opts) ReadThroughSegmentOptions() ReadThroughSegmentOptions {
return o.readThroughSegmentOptions
}

func (o *opts) SetForwardIndexProbability(value float64) Options {
opts := *o
opts.forwardIndexProbability = value
return &opts
}

func (o *opts) ForwardIndexProbability() float64 {
return o.forwardIndexProbability
}

func (o *opts) SetForwardIndexThreshold(value float64) Options {
opts := *o
opts.forwardIndexThreshold = value
return &opts
}

func (o *opts) ForwardIndexThreshold() float64 {
return o.forwardIndexThreshold
}
13 changes: 13 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,4 +850,17 @@ type Options interface {

// ReadThroughSegmentOptions returns the read through segment cache options.
ReadThroughSegmentOptions() ReadThroughSegmentOptions

// SetForwardIndexProbability sets the probability chance for forward writes.
SetForwardIndexProbability(value float64) Options

// ForwardIndexProbability returns the probability chance for forward writes.
ForwardIndexProbability() float64

// SetForwardIndexProbability sets the threshold for forward writes as a
// fraction of the bufferFuture.
SetForwardIndexThreshold(value float64) Options

// ForwardIndexProbability returns the threshold for forward writes.
ForwardIndexThreshold() float64
}
10 changes: 8 additions & 2 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,11 @@ func (n *dbNamespace) Write(
n.metrics.write.ReportError(n.nowFn().Sub(callStart))
return ts.Series{}, false, err
}
opts := series.WriteOptions{
TruncateType: n.opts.TruncateType(),
}
series, wasWritten, err := shard.Write(ctx, id, timestamp,
value, unit, annotation, series.WriteOptions{})
value, unit, annotation, opts)
n.metrics.write.ReportSuccessOrError(err, n.nowFn().Sub(callStart))
return series, wasWritten, err
}
Expand All @@ -616,8 +619,11 @@ func (n *dbNamespace) WriteTagged(
n.metrics.writeTagged.ReportError(n.nowFn().Sub(callStart))
return ts.Series{}, false, err
}
opts := series.WriteOptions{
TruncateType: n.opts.TruncateType(),
}
series, wasWritten, err := shard.WriteTagged(ctx, id, tags, timestamp,
value, unit, annotation, series.WriteOptions{})
value, unit, annotation, opts)
n.metrics.writeTagged.ReportSuccessOrError(err, n.nowFn().Sub(callStart))
return series, wasWritten, err
}
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/storage/namespace/index_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ func (i *indexOpts) SetBlockSize(value time.Duration) IndexOptions {
func (i *indexOpts) BlockSize() time.Duration {
return i.blockSize
}

Loading