Skip to content

Commit

Permalink
Implement the ability to write in batches in M3DB database (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardartoul authored Nov 16, 2018
1 parent e92da70 commit 87f9a5b
Show file tree
Hide file tree
Showing 37 changed files with 1,782 additions and 535 deletions.
15 changes: 15 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,23 @@ type CommitLogPolicy struct {
FlushEvery time.Duration `yaml:"flushEvery" validate:"nonzero"`

// The queue the commit log will keep in front of the current commit log segment.
// Modifying values in this policy will control how many pending writes can be
// in the commitlog queue before M3DB will begin rejecting writes.
Queue CommitLogQueuePolicy `yaml:"queue" validate:"nonzero"`

// The actual Golang channel that implements the commit log queue. We separate this
// from the Queue field for historical / legacy reasons. Generally speaking, the
// values in this config should not need to be modified, but we leave it in for
// tuning purposes. Unlike the Queue field, values in this policy control the size
// of the channel that backs the queue. Since writes to the commitlog are batched,
// setting the size of this policy will control how many batches can be queued, and
// indrectly how many writes can be queued, but that is dependent on the batch size
// of the client. As a result, we recommend that users avoid tuning this field and
// modify the Queue size instead which maps directly to the number of writes. This
// works in most cases because the default size of the QueueChannel should be large
// enough for almost all workloads assuming a reasonable batch size is used.
QueueChannel *CommitLogQueuePolicy `yaml:"queueChannel"`

// The commit log block size.
BlockSize time.Duration `yaml:"blockSize" validate:"nonzero"`
}
Expand Down
15 changes: 15 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ db:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
writeBatchPool:
initialBatchSize: 128
maxBatchSize: 100000
pool:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
identifierPool:
size: 9437184
lowWatermark: 0.01
Expand Down Expand Up @@ -389,6 +396,7 @@ db:
queue:
calculationType: fixed
size: 2097152
queueChannel: null
blockSize: 10m0s
repair:
enabled: false
Expand Down Expand Up @@ -522,6 +530,13 @@ db:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
writeBatchPool:
initialBatchSize: 128
maxBatchSize: 100000
pool:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
config:
service:
zone: embedded
Expand Down
95 changes: 56 additions & 39 deletions src/cmd/services/m3dbnode/config/pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,120 +34,123 @@ const (

// PoolingPolicy specifies the pooling policy.
type PoolingPolicy struct {
// The initial alloc size for a block
// The initial alloc size for a block.
BlockAllocSize int `yaml:"blockAllocSize"`

// The general pool type (currently only supported: simple).
Type PoolingType `yaml:"type"`

// The Bytes pool buckets to use
// The Bytes pool buckets to use.
BytesPool BucketPoolPolicy `yaml:"bytesPool"`

// The policy for the Closers pool
// The policy for the Closers pool.
ClosersPool PoolPolicy `yaml:"closersPool"`

// The policy for the Context pool
// The policy for the Context pool.
ContextPool ContextPoolPolicy `yaml:"contextPool"`

// The policy for the DatabaseSeries pool
// The policy for the DatabaseSeries pool.
SeriesPool PoolPolicy `yaml:"seriesPool"`

// The policy for the DatabaseBlock pool
// The policy for the DatabaseBlock pool.
BlockPool PoolPolicy `yaml:"blockPool"`

// The policy for the Encoder pool
// The policy for the Encoder pool.
EncoderPool PoolPolicy `yaml:"encoderPool"`

// The policy for the Iterator pool
// The policy for the Iterator pool.
IteratorPool PoolPolicy `yaml:"iteratorPool"`

// The policy for the Segment Reader pool
// The policy for the Segment Reader pool.
SegmentReaderPool PoolPolicy `yaml:"segmentReaderPool"`

// The policy for the Identifier pool
// The policy for the Identifier pool.
IdentifierPool PoolPolicy `yaml:"identifierPool"`

// The policy for the FetchBlockMetadataResult pool
// The policy for the FetchBlockMetadataResult pool.
FetchBlockMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlockMetadataResultsPool"`

// The policy for the FetchBlocksMetadataResults pool
// The policy for the FetchBlocksMetadataResults pool.
FetchBlocksMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlocksMetadataResultsPool"`

// The policy for the HostBlockMetadataSlice pool
// The policy for the HostBlockMetadataSlice pool.
HostBlockMetadataSlicePool CapacityPoolPolicy `yaml:"hostBlockMetadataSlicePool"`

// The policy for the BlockMetadat pool
// The policy for the BlockMetadat pool.
BlockMetadataPool PoolPolicy `yaml:"blockMetadataPool"`

// The policy for the BlockMetadataSlice pool
// The policy for the BlockMetadataSlice pool.
BlockMetadataSlicePool CapacityPoolPolicy `yaml:"blockMetadataSlicePool"`

// The policy for the BlocksMetadata pool
// The policy for the BlocksMetadata pool.
BlocksMetadataPool PoolPolicy `yaml:"blocksMetadataPool"`

// The policy for the BlocksMetadataSlice pool
// The policy for the BlocksMetadataSlice pool.
BlocksMetadataSlicePool CapacityPoolPolicy `yaml:"blocksMetadataSlicePool"`

// The policy for the tags pool
// The policy for the tags pool.
TagsPool MaxCapacityPoolPolicy `yaml:"tagsPool"`

// The policy for the tags iterator pool
// The policy for the tags iterator pool.
TagsIteratorPool PoolPolicy `yaml:"tagIteratorPool"`

// The policy for the index.ResultsPool
// The policy for the index.ResultsPool.
IndexResultsPool PoolPolicy `yaml:"indexResultsPool"`

// The policy for the TagEncoderPool
// The policy for the TagEncoderPool.
TagEncoderPool PoolPolicy `yaml:"tagEncoderPool"`

// The policy for the TagDecoderPool
// The policy for the TagDecoderPool.
TagDecoderPool PoolPolicy `yaml:"tagDecoderPool"`

// The policy for the WriteBatchPool.
WriteBatchPool WriteBatchPoolPolicy `yaml:"writeBatchPool"`
}

// PoolPolicy specifies a single pool policy.
type PoolPolicy struct {
// The size of the pool
// The size of the pool.
Size int `yaml:"size"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`
}

// CapacityPoolPolicy specifies a single pool policy that has a
// per element capacity.
type CapacityPoolPolicy struct {
// The size of the pool
// The size of the pool.
Size int `yaml:"size"`

// The capacity of items in the pool
// The capacity of items in the pool.
Capacity int `yaml:"capacity"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`
}

// MaxCapacityPoolPolicy specifies a single pool policy that has a
// per element capacity, and a maximum allowed capacity as well.
type MaxCapacityPoolPolicy struct {
// The size of the pool
// The size of the pool.
Size int `yaml:"size"`

// The capacity of items in the pool
// The capacity of items in the pool.
Capacity int `yaml:"capacity"`

// The max capacity of items in the pool
// The max capacity of items in the pool.
MaxCapacity int `yaml:"maxCapacity"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`
}

Expand All @@ -157,15 +160,15 @@ type BucketPoolPolicy struct {
Buckets []CapacityPoolPolicy `yaml:"buckets"`
}

// ContextPoolPolicy specifies the policy for the context pool
// ContextPoolPolicy specifies the policy for the context pool.
type ContextPoolPolicy struct {
// The size of the pool
Size int `yaml:"size"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`

// The maximum allowable size for a slice of finalizers that the
Expand All @@ -175,7 +178,21 @@ type ContextPoolPolicy struct {
MaxFinalizerCapacity int `yaml:"maxFinalizerCapacity" validate:"min=0"`
}

// PoolPolicy returns the PoolPolicy that is represented by the ContextPoolPolicy
// WriteBatchPoolPolicy specifies the pooling policy for the WriteBatch pool.
type WriteBatchPoolPolicy struct {
// InitialBatchSize controls the initial batch size for each WriteBatch when
// the pool is being constructed / refilled.
InitialBatchSize *int `yaml:"initialBatchSize"`

// MaxBatchSize controls the maximum size that a pooled WriteBatch can grow to
// and still remain in the pool.
MaxBatchSize *int `yaml:"maxBatchSize"`

// Pool is the Pooling Policy for the WriteBatch pool.
Pool PoolPolicy `yaml:"pool"`
}

// PoolPolicy returns the PoolPolicy that is represented by the ContextPoolPolicy.
func (c ContextPoolPolicy) PoolPolicy() PoolPolicy {
return PoolPolicy{
Size: c.Size,
Expand All @@ -185,7 +202,7 @@ func (c ContextPoolPolicy) PoolPolicy() PoolPolicy {
}

// MaxFinalizerCapacityWithDefault returns the maximum finalizer capacity and
// fallsback to the default value if its not set
// fallsback to the default value if its not set.
func (c ContextPoolPolicy) MaxFinalizerCapacityWithDefault() int {
if c.MaxFinalizerCapacity == 0 {
return defaultMaxFinalizerCapacity
Expand Down
9 changes: 5 additions & 4 deletions src/dbnode/integration/commitlog_bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3x/context"
"github.com/m3db/m3x/ident"
xtime "github.com/m3db/m3x/time"
Expand Down Expand Up @@ -152,18 +153,18 @@ func writeCommitLogDataBase(
)

// Write out commit log data
for ts, blk := range data {
for currTs, blk := range data {
if specifiedTS != nil {
s.setNowFn(*specifiedTS)
} else {
s.setNowFn(ts.ToTime())
s.setNowFn(currTs.ToTime())
}

ctx := context.NewContext()
defer ctx.Close()

m := map[xtime.UnixNano]generate.SeriesBlock{
ts: blk,
currTs: blk,
}

points := generate.
Expand All @@ -179,7 +180,7 @@ func writeCommitLogDataBase(
for _, point := range points {
series, ok := seriesLookup[point.ID.String()]
require.True(t, ok)
cId := commitlog.Series{
cId := ts.Series{
Namespace: namespace.ID(),
Shard: shardSet.Lookup(point.ID),
ID: point.ID,
Expand Down
Loading

0 comments on commit 87f9a5b

Please sign in to comment.