From 2b605670020fc637e20a609b1dc86c1f0e7afdd1 Mon Sep 17 00:00:00 2001 From: Austin Clements Date: Fri, 1 Mar 2019 13:16:37 -0500 Subject: [PATCH] sync: internal fixed size lock-free queue for sync.Pool This is the first step toward fixing multiple issues with sync.Pool. This adds a fixed size, lock-free, single-producer, multi-consumer queue that will be used in the new Pool stealing implementation. For #22950, #22331. Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5 Reviewed-on: https://go-review.googlesource.com/c/go/+/166957 Run-TryBot: Austin Clements TryBot-Result: Gobot Gobot Reviewed-by: David Chase --- src/sync/export_test.go | 25 ++++++ src/sync/pool_test.go | 73 ++++++++++++++++ src/sync/poolqueue.go | 185 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 283 insertions(+) create mode 100644 src/sync/poolqueue.go diff --git a/src/sync/export_test.go b/src/sync/export_test.go index 669076efad3f12..0252b64f58919a 100644 --- a/src/sync/export_test.go +++ b/src/sync/export_test.go @@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire var Runtime_Semrelease = runtime_Semrelease var Runtime_procPin = runtime_procPin var Runtime_procUnpin = runtime_procUnpin + +// poolDequeue testing. +type PoolDequeue interface { + PushHead(val interface{}) bool + PopHead() (interface{}, bool) + PopTail() (interface{}, bool) +} + +func NewPoolDequeue(n int) PoolDequeue { + return &poolDequeue{ + vals: make([]eface, n), + } +} + +func (d *poolDequeue) PushHead(val interface{}) bool { + return d.pushHead(val) +} + +func (d *poolDequeue) PopHead() (interface{}, bool) { + return d.popHead() +} + +func (d *poolDequeue) PopTail() (interface{}, bool) { + return d.popTail() +} diff --git a/src/sync/pool_test.go b/src/sync/pool_test.go index 9e5132bb18e168..6e9f9f3463c253 100644 --- a/src/sync/pool_test.go +++ b/src/sync/pool_test.go @@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) { } } +func TestPoolDequeue(t *testing.T) { + const P = 10 + // In long mode, do enough pushes to wrap around the 21-bit + // indexes. + N := 1<<21 + 1000 + if testing.Short() { + N = 1e3 + } + d := NewPoolDequeue(16) + have := make([]int32, N) + var stop int32 + var wg WaitGroup + + // Start P-1 consumers. + for i := 1; i < P; i++ { + wg.Add(1) + go func() { + fail := 0 + for atomic.LoadInt32(&stop) == 0 { + val, ok := d.PopTail() + if ok { + fail = 0 + atomic.AddInt32(&have[val.(int)], 1) + if val.(int) == N-1 { + atomic.StoreInt32(&stop, 1) + } + } else { + // Speed up the test by + // allowing the pusher to run. + if fail++; fail%100 == 0 { + runtime.Gosched() + } + } + } + wg.Done() + }() + } + + // Start 1 producer. + nPopHead := 0 + wg.Add(1) + go func() { + for j := 0; j < N; j++ { + for !d.PushHead(j) { + // Allow a popper to run. + runtime.Gosched() + } + if j%10 == 0 { + val, ok := d.PopHead() + if ok { + nPopHead++ + atomic.AddInt32(&have[val.(int)], 1) + } + } + } + wg.Done() + }() + wg.Wait() + + // Check results. + for i, count := range have { + if count != 1 { + t.Errorf("expected have[%d] = 1, got %d", i, count) + } + } + if nPopHead == 0 { + // In theory it's possible in a valid schedule for + // popHead to never succeed, but in practice it almost + // always succeeds, so this is unlikely to flake. + t.Errorf("popHead never succeeded") + } +} + func BenchmarkPool(b *testing.B) { var p Pool b.RunParallel(func(pb *testing.PB) { diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go new file mode 100644 index 00000000000000..bc2ab647ffbc70 --- /dev/null +++ b/src/sync/poolqueue.go @@ -0,0 +1,185 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// poolDequeue is a lock-free fixed-size single-producer, +// multi-consumer queue. The single producer can both push and pop +// from the head, and consumers can pop from the tail. +// +// It has the added feature that it nils out unused slots to avoid +// unnecessary retention of objects. This is important for sync.Pool, +// but not typically a property considered in the literature. +type poolDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // vals[i].typ is nil if the slot is empty and non-nil + // otherwise. A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []eface +} + +type eface struct { + typ, val unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a poolDequeue. +// +// This is half of 1<> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *poolDequeue) pack(head, tail uint32) uint64 { + const mask = 1<