Skip to content

Commit

Permalink
socket: amortise cost of querying OS time counter (globalsign#149)
Browse files Browse the repository at this point in the history
globalsign#116 adds a much needed ability to shrink the connection pool, but requires
tracking the last-used timestamp for each socket after every operation. Frequent
calls to time.Now() in the hot-path reduced read throughput by ~6% and increased
the latency (and variance) of socket operations as a whole.

This PR adds a periodically updated time value to amortise the cost of the last-
used bookkeeping, restoring the original throughput at the cost of approximate
last-used values (configured to be ~25ms of potential skew).

On some systems (currently including FreeBSD) querying the time counter also
requires a syscall/context switch.

Fixes globalsign#142.
  • Loading branch information
domodwyer authored Apr 19, 2018
1 parent 57bf42b commit 243904d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 1 deletion.
62 changes: 62 additions & 0 deletions coarse_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package mgo

import (
"sync"
"sync/atomic"
"time"
)

// coarseTimeProvider provides a periodically updated (approximate) time value to
// amortise the cost of frequent calls to time.Now.
//
// A read throughput increase of ~6% was measured when using coarseTimeProvider with the
// high-precision event timer (HPET) on FreeBSD 11.1 and Go 1.10.1 after merging
// #116.
//
// Calling Now returns a time.Time that is updated at the configured interval,
// however due to scheduling the value may be marginally older than expected.
//
// coarseTimeProvider is safe for concurrent use.
type coarseTimeProvider struct {
once sync.Once
stop chan struct{}
last atomic.Value
}

// Now returns the most recently acquired time.Time value.
func (t *coarseTimeProvider) Now() time.Time {
return t.last.Load().(time.Time)
}

// Close stops the periodic update of t.
//
// Any subsequent calls to Now will return the same value forever.
func (t *coarseTimeProvider) Close() {
t.once.Do(func() {
close(t.stop)
})
}

// newcoarseTimeProvider returns a coarseTimeProvider configured to update at granularity.
func newcoarseTimeProvider(granularity time.Duration) *coarseTimeProvider {
t := &coarseTimeProvider{
stop: make(chan struct{}),
}

t.last.Store(time.Now())

go func() {
ticker := time.NewTicker(granularity)
for {
select {
case <-t.stop:
ticker.Stop()
return
case <-ticker.C:
t.last.Store(time.Now())
}
}
}()

return t
}
23 changes: 23 additions & 0 deletions coarse_time_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package mgo

import (
"testing"
"time"
)

func TestCoarseTimeProvider(t *testing.T) {
t.Parallel()

const granularity = 50 * time.Millisecond

ct := newcoarseTimeProvider(granularity)
defer ct.Close()

start := ct.Now().Unix()
time.Sleep(time.Second)

got := ct.Now().Unix()
if got <= start {
t.Fatalf("got %d, expected at least %d", got, start)
}
}
14 changes: 13 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ import (
"github.com/globalsign/mgo/bson"
)

// coarseTime is used to amortise the cost of querying the timecounter (possibly
// incurring a syscall too) when setting a socket.lastTimeUsed value which
// happens frequently in the hot-path.
//
// The lastTimeUsed value may be skewed by at least 25ms (see
// coarseTimeProvider).
var coarseTime *coarseTimeProvider

func init() {
coarseTime = newcoarseTimeProvider(25 * time.Millisecond)
}

// ---------------------------------------------------------------------------
// Mongo server encapsulation.

Expand Down Expand Up @@ -293,7 +305,7 @@ func (server *mongoServer) close(waitForIdle bool) {
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
server.Lock()
if !server.closed {
socket.lastTimeUsed = time.Now()
socket.lastTimeUsed = coarseTime.Now() // A rough approximation of the current time - see courseTime
server.unusedSockets = append(server.unusedSockets, socket)
}
// If anybody is waiting for a connection, they should try now.
Expand Down

0 comments on commit 243904d

Please sign in to comment.