Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the ability to write in batches in M3DB database #1157

Merged
merged 87 commits into from
Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
59ee120
Convert commitlog chan to batches
Nov 6, 2018
4176ec9
refactor
Nov 6, 2018
88d10ae
Add write batch function
Nov 6, 2018
72f7d42
Fast commitlog path
Nov 7, 2018
9e4298e
Add comments
Nov 7, 2018
af402e2
Break benchmarks into separate file
Nov 7, 2018
002c606
Move fast encoder to its own file
Nov 7, 2018
fe8f67f
Revert "Add write batch function"
Nov 7, 2018
c832f1a
Revert "refactor"
Nov 7, 2018
14fffe8
Revert "Convert commitlog chan to batches"
Nov 7, 2018
65d5e59
Comment out dead code
Nov 8, 2018
4194a30
sort imports
Nov 8, 2018
4f32c2d
remove commitlog from namespace and return commitlog.series from shar…
Nov 6, 2018
f80a127
Return commitlog.Series from namespace write methods
Nov 6, 2018
8a8d73c
Write to commitlog from database
Nov 6, 2018
0add432
Fix broken tests
Nov 6, 2018
71f9f83
Add WriteTaggedBatch to database
Nov 6, 2018
60b6d20
Fix lint issues
Nov 6, 2018
7213bff
Move commitlog.Series to ts.Series
Nov 8, 2018
ab01d5f
Move writebatch to types.go
Nov 8, 2018
3491132
regen mocks
Nov 8, 2018
94f0702
Fix type issues and regen mocks
Nov 8, 2018
27d6ac5
Fix more type issues
Nov 8, 2018
2cee069
Refactor commitlog code to not allocate slice for individual writes
Nov 8, 2018
6b9731b
Dont allocate function
Nov 8, 2018
373e84f
wip
Nov 8, 2018
1cc04b5
wip
Nov 8, 2018
a60ac6a
WIP
Nov 8, 2018
3c5eeb1
Add tests
Nov 8, 2018
7f044cc
more tests for writebatch
Nov 9, 2018
e1c1238
more refactoring
Nov 9, 2018
34f32b3
Add method to interface
Nov 9, 2018
bab30a3
Add pooling for batch
Nov 9, 2018
0dfce36
init pool
Nov 9, 2018
aa0e081
fix stuff
Nov 9, 2018
8b4e951
Fix iter and pooling
Nov 11, 2018
96358e1
update queue length metric
Nov 11, 2018
b82cb3b
hook to thrift
Nov 11, 2018
0f49a85
Fix typo
Nov 12, 2018
7bc2865
Update tests and add errors
Nov 12, 2018
2718868
Fix compilation issues
Nov 12, 2018
10b83fc
Improve batch error handling
Nov 12, 2018
35cfb6f
Update tests for service code
Nov 12, 2018
46022e7
Propagate namespace in WriteBatch
Nov 12, 2018
45803ce
Add database tests
Nov 12, 2018
1efa96f
update test
Nov 12, 2018
1a56542
improve comment
Nov 12, 2018
1e948a7
Increment enqueued in writeWait
Nov 12, 2018
954153b
Move writeBatchPool to ts package
Nov 12, 2018
ffeb5df
Add setter and getter to options
Nov 12, 2018
9c02f0d
Wire up pooling config
Nov 12, 2018
f2c9577
Set struct field properly
Nov 12, 2018
e473c4c
Fix metrics
Nov 12, 2018
828a098
Reorder interface methods
Nov 12, 2018
ed4e91a
Remove need for typecase
Nov 12, 2018
2cb5b56
Simplify maxBatchSize propagation
Nov 12, 2018
ab2751a
Add concept of initialBatchSize
Nov 13, 2018
74edcc1
fix lint issue
Nov 13, 2018
fe87ccc
Fix flaky test
Nov 13, 2018
e051893
Fix config yaml names and broken test
Nov 13, 2018
39846e3
Fix broken service code and tests
Nov 13, 2018
a0b575c
Fix broken success metrics
Nov 13, 2018
008b863
Fix lint issues
Nov 13, 2018
f0de17c
fix unused var
Nov 13, 2018
6a13c7d
Dont write to commitlog if error
Nov 13, 2018
794085c
Fix error handling
Nov 13, 2018
310a413
Fix more error handling
Nov 13, 2018
642c7aa
wire up new config for commitlog;
Nov 14, 2018
3ad50ef
Block on writing to commitlog queue channel
Nov 14, 2018
ba94b96
Fix missing unlock
Nov 14, 2018
7b3c096
Optimistically inc enqueued writes
Nov 15, 2018
4f94320
Update database interfaces to have IndexedErrorHandle interface
Nov 15, 2018
dae8e38
update thrift code to allocate less errors
Nov 15, 2018
0e80b47
Allow queueChannel config to be nil
Nov 15, 2018
4a9cc74
rename var for clarity
Nov 15, 2018
628a962
simplify error handling
Nov 15, 2018
589246a
Fix typo
Nov 15, 2018
bb852b7
fix comment
Nov 15, 2018
a1fcddf
update comment
Nov 15, 2018
94f087a
Clear fields and resize slice in BatchWrite finalize
Nov 15, 2018
780fb3c
Remove shardFn
Nov 15, 2018
0d8a9a4
Fix broken test
Nov 15, 2018
8a0c4e9
Fixbroken test
Nov 15, 2018
134f367
Handle individual write sand batch write similarly
Nov 16, 2018
e157bce
Update comment
Nov 16, 2018
af10b59
Add clarifying comment
Nov 16, 2018
dedfb58
fix nits
Nov 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,23 @@ type CommitLogPolicy struct {
FlushEvery time.Duration `yaml:"flushEvery" validate:"nonzero"`

// The queue the commit log will keep in front of the current commit log segment.
// Modifying values in this policy will control how many pending writes can be
// in the commitlog queue before M3DB will begin rejecting writes.
Queue CommitLogQueuePolicy `yaml:"queue" validate:"nonzero"`

// The actual Golang channel that implements the commit log queue. We separate this
// from the Queue field for historical / legacy reasons. Generally speaking, the
// values in this config should not need to be modified, but we leave it in for
// tuning purposes. Unlike the Queue field, values in this policy control the size
// of the channel that backs the queue. Since writes to the commitlog are batched,
// setting the size of this policy will control how many batches can be queued, and
// indrectly how many writes can be queued, but that is dependent on the batch size
// of the client. As a result, we recommend that users avoid tuning this field and
// modify the Queue size instead which maps directly to the number of writes. This
// works in most cases because the default size of the QueueChannel should be large
// enough for almost all workloads assuming a reasonable batch size is used.
QueueChannel *CommitLogQueuePolicy `yaml:"queueChannel"`

// The commit log block size.
BlockSize time.Duration `yaml:"blockSize" validate:"nonzero"`
}
Expand Down
15 changes: 15 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ db:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
writeBatchPool:
initialBatchSize: 128
maxBatchSize: 100000
pool:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
identifierPool:
size: 9437184
lowWatermark: 0.01
Expand Down Expand Up @@ -389,6 +396,7 @@ db:
queue:
calculationType: fixed
size: 2097152
queueChannel: null
blockSize: 10m0s
repair:
enabled: false
Expand Down Expand Up @@ -522,6 +530,13 @@ db:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
writeBatchPool:
initialBatchSize: 128
maxBatchSize: 100000
pool:
size: 8192
lowWatermark: 0.01
highWatermark: 0.02
config:
service:
zone: embedded
Expand Down
95 changes: 56 additions & 39 deletions src/cmd/services/m3dbnode/config/pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,120 +34,123 @@ const (

// PoolingPolicy specifies the pooling policy.
type PoolingPolicy struct {
// The initial alloc size for a block
// The initial alloc size for a block.
BlockAllocSize int `yaml:"blockAllocSize"`

// The general pool type (currently only supported: simple).
Type PoolingType `yaml:"type"`

// The Bytes pool buckets to use
// The Bytes pool buckets to use.
BytesPool BucketPoolPolicy `yaml:"bytesPool"`

// The policy for the Closers pool
// The policy for the Closers pool.
ClosersPool PoolPolicy `yaml:"closersPool"`

// The policy for the Context pool
// The policy for the Context pool.
ContextPool ContextPoolPolicy `yaml:"contextPool"`

// The policy for the DatabaseSeries pool
// The policy for the DatabaseSeries pool.
SeriesPool PoolPolicy `yaml:"seriesPool"`

// The policy for the DatabaseBlock pool
// The policy for the DatabaseBlock pool.
BlockPool PoolPolicy `yaml:"blockPool"`

// The policy for the Encoder pool
// The policy for the Encoder pool.
EncoderPool PoolPolicy `yaml:"encoderPool"`

// The policy for the Iterator pool
// The policy for the Iterator pool.
IteratorPool PoolPolicy `yaml:"iteratorPool"`

// The policy for the Segment Reader pool
// The policy for the Segment Reader pool.
SegmentReaderPool PoolPolicy `yaml:"segmentReaderPool"`

// The policy for the Identifier pool
// The policy for the Identifier pool.
IdentifierPool PoolPolicy `yaml:"identifierPool"`

// The policy for the FetchBlockMetadataResult pool
// The policy for the FetchBlockMetadataResult pool.
FetchBlockMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlockMetadataResultsPool"`

// The policy for the FetchBlocksMetadataResults pool
// The policy for the FetchBlocksMetadataResults pool.
FetchBlocksMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlocksMetadataResultsPool"`

// The policy for the HostBlockMetadataSlice pool
// The policy for the HostBlockMetadataSlice pool.
HostBlockMetadataSlicePool CapacityPoolPolicy `yaml:"hostBlockMetadataSlicePool"`

// The policy for the BlockMetadat pool
// The policy for the BlockMetadat pool.
BlockMetadataPool PoolPolicy `yaml:"blockMetadataPool"`

// The policy for the BlockMetadataSlice pool
// The policy for the BlockMetadataSlice pool.
BlockMetadataSlicePool CapacityPoolPolicy `yaml:"blockMetadataSlicePool"`

// The policy for the BlocksMetadata pool
// The policy for the BlocksMetadata pool.
BlocksMetadataPool PoolPolicy `yaml:"blocksMetadataPool"`

// The policy for the BlocksMetadataSlice pool
// The policy for the BlocksMetadataSlice pool.
BlocksMetadataSlicePool CapacityPoolPolicy `yaml:"blocksMetadataSlicePool"`

// The policy for the tags pool
// The policy for the tags pool.
TagsPool MaxCapacityPoolPolicy `yaml:"tagsPool"`

// The policy for the tags iterator pool
// The policy for the tags iterator pool.
TagsIteratorPool PoolPolicy `yaml:"tagIteratorPool"`

// The policy for the index.ResultsPool
// The policy for the index.ResultsPool.
IndexResultsPool PoolPolicy `yaml:"indexResultsPool"`

// The policy for the TagEncoderPool
// The policy for the TagEncoderPool.
TagEncoderPool PoolPolicy `yaml:"tagEncoderPool"`

// The policy for the TagDecoderPool
// The policy for the TagDecoderPool.
TagDecoderPool PoolPolicy `yaml:"tagDecoderPool"`

// The policy for the WriteBatchPool.
WriteBatchPool WriteBatchPoolPolicy `yaml:"writeBatchPool"`
}

// PoolPolicy specifies a single pool policy.
type PoolPolicy struct {
// The size of the pool
// The size of the pool.
Size int `yaml:"size"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`
}

// CapacityPoolPolicy specifies a single pool policy that has a
// per element capacity.
type CapacityPoolPolicy struct {
// The size of the pool
// The size of the pool.
Size int `yaml:"size"`

// The capacity of items in the pool
// The capacity of items in the pool.
Capacity int `yaml:"capacity"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`
}

// MaxCapacityPoolPolicy specifies a single pool policy that has a
// per element capacity, and a maximum allowed capacity as well.
type MaxCapacityPoolPolicy struct {
// The size of the pool
// The size of the pool.
Size int `yaml:"size"`

// The capacity of items in the pool
// The capacity of items in the pool.
Capacity int `yaml:"capacity"`

// The max capacity of items in the pool
// The max capacity of items in the pool.
MaxCapacity int `yaml:"maxCapacity"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`
}

Expand All @@ -157,15 +160,15 @@ type BucketPoolPolicy struct {
Buckets []CapacityPoolPolicy `yaml:"buckets"`
}

// ContextPoolPolicy specifies the policy for the context pool
// ContextPoolPolicy specifies the policy for the context pool.
type ContextPoolPolicy struct {
// The size of the pool
Size int `yaml:"size"`

// The low watermark to start refilling the pool, if zero none
// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"`

// The high watermark to stop refilling the pool, if zero none
// The high watermark to stop refilling the pool, if zero none.
RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"`

// The maximum allowable size for a slice of finalizers that the
Expand All @@ -175,7 +178,21 @@ type ContextPoolPolicy struct {
MaxFinalizerCapacity int `yaml:"maxFinalizerCapacity" validate:"min=0"`
}

// PoolPolicy returns the PoolPolicy that is represented by the ContextPoolPolicy
// WriteBatchPoolPolicy specifies the pooling policy for the WriteBatch pool.
type WriteBatchPoolPolicy struct {
// InitialBatchSize controls the initial batch size for each WriteBatch when
// the pool is being constructed / refilled.
InitialBatchSize *int `yaml:"initialBatchSize"`

// MaxBatchSize controls the maximum size that a pooled WriteBatch can grow to
// and still remain in the pool.
MaxBatchSize *int `yaml:"maxBatchSize"`

// Pool is the Pooling Policy for the WriteBatch pool.
Pool PoolPolicy `yaml:"pool"`
}

// PoolPolicy returns the PoolPolicy that is represented by the ContextPoolPolicy.
func (c ContextPoolPolicy) PoolPolicy() PoolPolicy {
return PoolPolicy{
Size: c.Size,
Expand All @@ -185,7 +202,7 @@ func (c ContextPoolPolicy) PoolPolicy() PoolPolicy {
}

// MaxFinalizerCapacityWithDefault returns the maximum finalizer capacity and
// fallsback to the default value if its not set
// fallsback to the default value if its not set.
func (c ContextPoolPolicy) MaxFinalizerCapacityWithDefault() int {
if c.MaxFinalizerCapacity == 0 {
return defaultMaxFinalizerCapacity
Expand Down
9 changes: 5 additions & 4 deletions src/dbnode/integration/commitlog_bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3x/context"
"github.com/m3db/m3x/ident"
xtime "github.com/m3db/m3x/time"
Expand Down Expand Up @@ -152,18 +153,18 @@ func writeCommitLogDataBase(
)

// Write out commit log data
for ts, blk := range data {
for currTs, blk := range data {
if specifiedTS != nil {
s.setNowFn(*specifiedTS)
} else {
s.setNowFn(ts.ToTime())
s.setNowFn(currTs.ToTime())
}

ctx := context.NewContext()
defer ctx.Close()

m := map[xtime.UnixNano]generate.SeriesBlock{
ts: blk,
currTs: blk,
}

points := generate.
Expand All @@ -179,7 +180,7 @@ func writeCommitLogDataBase(
for _, point := range points {
series, ok := seriesLookup[point.ID.String()]
require.True(t, ok)
cId := commitlog.Series{
cId := ts.Series{
Namespace: namespace.ID(),
Shard: shardSet.Lookup(point.ID),
ID: point.ID,
Expand Down
Loading