Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve M3DB session performance part 1: Pool goroutines in host queues #985

Merged
merged 10 commits into from
Sep 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package: github.com/m3db/m3
import:
- package: github.com/m3db/m3x
version: 76a586220279667a81eaaec4150de182f4d5077c
version: 3ab0cd67ff3c6e63a40893bad1ec4d993b5422c2
vcs: git
subpackages:
- checked
Expand Down Expand Up @@ -215,9 +215,19 @@ import:
repo: https://github.com/go-validator/validator.git
vcs: git

testImport:
- package: github.com/fortytw2/leaktest
version: b433bbd6d743c1854040b39062a3916ed5f78fe8
- package: gopkg.in/go-playground/validator.v9
version: a021b2ec9a8a8bb970f3f15bc42617cb520e8a64
repo: https://github.com/go-playground/validator.git
vcs: git

- package: gopkg.in/yaml.v2
version: 5420a8b6744d3b0345ab293f6fcba19c978f1183
repo: https://github.com/go-yaml/yaml.git
vcs: git

- package: github.com/leanovate/gopter
version: f0356731348c8fffa27bab27c37ec8be5b0662c8

testImport:
- package: github.com/fortytw2/leaktest
version: b433bbd6d743c1854040b39062a3916ed5f78fe8
63 changes: 41 additions & 22 deletions src/dbnode/client/host_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/pool"
xsync "github.com/m3db/m3x/sync"

"github.com/uber/tchannel-go/thrift"
)

const workerPoolKillProbability = 0.01

type queue struct {
sync.WaitGroup
sync.RWMutex
Expand All @@ -47,6 +50,7 @@ type queue struct {
writeBatchRawRequestElementArrayPool writeBatchRawRequestElementArrayPool
writeTaggedBatchRawRequestPool writeTaggedBatchRawRequestPool
writeTaggedBatchRawRequestElementArrayPool writeTaggedBatchRawRequestElementArrayPool
workerPool xsync.PooledWorkerPool
size int
ops []op
opsSumSize int
Expand All @@ -59,13 +63,30 @@ type queue struct {
func newHostQueue(
host topology.Host,
hostQueueOpts hostQueueOpts,
) hostQueue {
opts := hostQueueOpts.opts
scope := opts.InstrumentOptions().MetricsScope().
SubScope("hostqueue").
Tagged(map[string]string{
"hostID": host.ID(),
})
) (hostQueue, error) {
var (
opts = hostQueueOpts.opts
iOpts = opts.InstrumentOptions()
scope = iOpts.MetricsScope().
SubScope("hostqueue").
Tagged(map[string]string{
"hostID": host.ID(),
})
)
iOpts = iOpts.SetMetricsScope(scope)

workerPoolOpts := xsync.NewPooledWorkerPoolOptions().
SetGrowOnDemand(true).
SetKillWorkerProbability(workerPoolKillProbability).
SetInstrumentOptions(iOpts)
workerPool, err := xsync.NewPooledWorkerPool(
int(workerPoolOpts.NumShards()),
workerPoolOpts,
)
if err != nil {
return nil, err
}
workerPool.Init()

opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope))

Expand All @@ -90,11 +111,12 @@ func newHostQueue(
writeBatchRawRequestElementArrayPool: hostQueueOpts.writeBatchRawRequestElementArrayPool,
writeTaggedBatchRawRequestPool: hostQueueOpts.writeTaggedBatchRawRequestPool,
writeTaggedBatchRawRequestElementArrayPool: hostQueueOpts.writeTaggedBatchRawRequestElementArrayPool,
workerPool: workerPool,
size: size,
ops: opArrayPool.Get(),
opsArrayPool: opArrayPool,
drainIn: make(chan []op, opsArraysLen),
}
}, nil
}

func (q *queue) Open() {
Expand Down Expand Up @@ -283,8 +305,8 @@ func (q *queue) asyncTaggedWrite(
elems []*rpc.WriteTaggedBatchRawRequestElement,
) {
q.Add(1)
// TODO(r): Use a worker pool to avoid creating new go routines for async writes
go func() {

q.workerPool.Go(func() {
req := q.writeTaggedBatchRawRequestPool.Get()
req.NameSpace = namespace.Bytes()
req.Elements = elems
Expand Down Expand Up @@ -339,7 +361,7 @@ func (q *queue) asyncTaggedWrite(
// Entire batch failed
callAllCompletionFns(ops, q.host, err)
cleanup()
}()
})
}

func (q *queue) asyncWrite(
Expand All @@ -348,8 +370,7 @@ func (q *queue) asyncWrite(
elems []*rpc.WriteBatchRawRequestElement,
) {
q.Add(1)
// TODO(r): Use a worker pool to avoid creating new go routines for async writes
go func() {
q.workerPool.Go(func() {
req := q.writeBatchRawRequestPool.Get()
req.NameSpace = namespace.Bytes()
req.Elements = elems
Expand Down Expand Up @@ -404,13 +425,12 @@ func (q *queue) asyncWrite(
// Entire batch failed
callAllCompletionFns(ops, q.host, err)
cleanup()
}()
})
}

func (q *queue) asyncFetch(op *fetchBatchOp) {
q.Add(1)
// TODO(r): Use a worker pool to avoid creating new go routines for async fetches
go func() {
q.workerPool.Go(func() {
// NB(r): Defer is slow in the hot path unfortunately
cleanup := func() {
op.DecRef()
Expand Down Expand Up @@ -449,13 +469,12 @@ func (q *queue) asyncFetch(op *fetchBatchOp) {
op.complete(i, result.Elements[i].Segments, nil)
}
cleanup()
}()
})
}

func (q *queue) asyncFetchTagged(op *fetchTaggedOp) {
q.Add(1)
// TODO(r): Use a worker pool to avoid creating new go routines for async fetches
go func() {
q.workerPool.Go(func() {
// NB(r): Defer is slow in the hot path unfortunately
cleanup := func() {
op.decRef()
Expand Down Expand Up @@ -483,13 +502,13 @@ func (q *queue) asyncFetchTagged(op *fetchTaggedOp) {
response: result,
}, err)
cleanup()
}()
})
}

func (q *queue) asyncTruncate(op *truncateOp) {
q.Add(1)

go func() {
q.workerPool.Go(func() {
cleanup := q.Done

client, err := q.connPool.NextClient()
Expand All @@ -508,7 +527,7 @@ func (q *queue) asyncTruncate(op *truncateOp) {
}

cleanup()
}()
})
}

func (q *queue) Len() int {
Expand Down
Loading