Skip to content

Commit

Permalink
enable shrink the socket pool size (#116)
Browse files Browse the repository at this point in the history
* enable shrink the socket pool size

we found the mgo will allocate the pool size during burst traffic but won't
close the sockets any more until restart the client or server.

And the mongo document defines two related query options

- [minPoolSize](https://docs.mongodb.com/manual/reference/connection-string/#urioption.minPoolSize)
- [maxIdleTimeMS](https://docs.mongodb.com/manual/reference/connection-string/#urioption.maxIdleTimeMS)

By implementing these two options, it could shrink the pool to minPoolSize after
the sockets introduced by burst traffic timeout.

The idea comes from https://github.com/JodeZer/mgo , he investigated
this issue and provide the initial commits.

I found there are still some issue in sockets maintenance, and had a PR against
his repo JodeZer#1 .

This commit include JodeZer's commits and my fix, and I simplified the data structure.
What's in this commit could be described as this figure:

+------------------------+
| Session | <-------+ Add options here
+------------------------+

+------------------------+
| Cluster | <-------+ Add options here
+------------------------+

+------------------------+
| Server | <-------+*Add options here
| | *add timestamp when recycle a socket +---+
| +-----------+ | +---+ *periodically check the unused sockets |
| | shrinker <------+ and reclaim the timeout sockets. +---+
| +-----------+ | |
| | |
+------------------------+ |
|
+------------------------+ |
| Socket | <-------+ Add a field for last used times+---------+
+------------------------+

Signed-off-by: Wang Xu <[email protected]>

* tests for shrink the socks pool

Signed-off-by: Wang Xu <[email protected]>
  • Loading branch information
gnawux authored and domodwyer committed Mar 1, 2018
1 parent 91cf46c commit 860240e
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 24 deletions.
36 changes: 20 additions & 16 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,23 @@ import (

type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
minPoolSize int
maxIdleTimeMS int
}

func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
Expand Down Expand Up @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() {
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
cluster.RLock()
server := cluster.servers.Search(tcpaddr.String())
minPoolSize := cluster.minPoolSize
maxIdleTimeMS := cluster.maxIdleTimeMS
cluster.RUnlock()
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
}

func resolveAddr(addr string) (*net.TCPAddr, error) {
Expand Down
71 changes: 63 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type mongoServer struct {
pingCount uint32
closed bool
abended bool
minPoolSize int
maxIdleTimeMS int
}

type dialer struct {
Expand All @@ -76,17 +78,22 @@ type mongoServerInfo struct {

var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
go server.pinger(true)
if maxIdleTimeMS != 0 {
go server.poolShrinker()
}
return server
}

Expand Down Expand Up @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) {
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
server.Lock()
if !server.closed {
socket.lastTimeUsed = time.Now()
server.unusedSockets = append(server.unusedSockets, socket)
}
server.Unlock()
Expand Down Expand Up @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) {
}
}

func (server *mongoServer) poolShrinker() {
ticker := time.NewTicker(1 * time.Minute)
for _ = range ticker.C {
if server.closed {
ticker.Stop()
return
}
server.Lock()
unused := len(server.unusedSockets)
if unused < server.minPoolSize {
server.Unlock()
continue
}
now := time.Now()
end := 0
reclaimMap := map[*mongoSocket]struct{}{}
// Because the acquisition and recycle are done at the tail of array,
// the head is always the oldest unused socket.
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
break
}
end++
reclaimMap[s] = struct{}{}
}
tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*mongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
remainSockets := []*mongoSocket{}
for _, s := range server.liveSockets {
if _, ok := reclaimMap[s]; !ok {
remainSockets = append(remainSockets, s)
}
}
server.liveSockets = remainSockets
stats.conn(-1*end, server.info.Master)
}
server.Unlock()

for _, s := range tbr {
s.Close()
}
}
}

type mongoServerSlice []*mongoServer

func (s mongoServerSlice) Len() int {
Expand Down
42 changes: 42 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ const (
// Defines the per-server socket pool limit. Defaults to 4096.
// See Session.SetPoolLimit for details.
//
// minPoolSize=<limit>
//
// Defines the per-server socket pool minium size. Defaults to 0.
//
// maxIdleTimeMS=<millisecond>
//
// The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed. If maxIdleTimeMS is 0, connections will never be
// closed due to inactivity.
//
// appName=<appName>
//
// The identifier of this client application. This parameter is used to
Expand Down Expand Up @@ -322,6 +332,8 @@ func ParseURL(url string) (*DialInfo, error) {
appName := ""
readPreferenceMode := Primary
var readPreferenceTagSets []bson.D
minPoolSize := 0
maxIdleTimeMS := 0
for _, opt := range uinfo.options {
switch opt.key {
case "authSource":
Expand Down Expand Up @@ -368,6 +380,22 @@ func ParseURL(url string) (*DialInfo, error) {
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
}
readPreferenceTagSets = append(readPreferenceTagSets, doc)
case "minPoolSize":
minPoolSize, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for minPoolSize: " + opt.value)
}
if minPoolSize < 0 {
return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value)
}
case "maxIdleTimeMS":
maxIdleTimeMS, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
}
if maxIdleTimeMS < 0 {
return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value)
}
case "connect":
if opt.value == "direct" {
direct = true
Expand Down Expand Up @@ -402,6 +430,8 @@ func ParseURL(url string) (*DialInfo, error) {
TagSets: readPreferenceTagSets,
},
ReplicaSetName: setName,
MinPoolSize: minPoolSize,
MaxIdleTimeMS: maxIdleTimeMS,
}
return &info, nil
}
Expand Down Expand Up @@ -475,6 +505,14 @@ type DialInfo struct {
// cluster and establish connections with further servers too.
Direct bool

// MinPoolSize defines The minimum number of connections in the connection pool.
// Defaults to 0.
MinPoolSize int

//The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed.
MaxIdleTimeMS int

// DialServer optionally specifies the dial function for establishing
// connections with the MongoDB servers.
DialServer func(addr *ServerAddr) (net.Conn, error)
Expand Down Expand Up @@ -554,6 +592,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
if info.PoolLimit > 0 {
session.poolLimit = info.PoolLimit
}

cluster.minPoolSize = info.MinPoolSize
cluster.maxIdleTimeMS = info.MaxIdleTimeMS

cluster.Release()

// People get confused when we return a session that is not actually
Expand Down
86 changes: 86 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"flag"
"fmt"
"math"
"math/rand"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) {
}
}

func (s *S) TestMinPoolSize(c *C) {
tests := []struct {
url string
size int
fail bool
}{
{"localhost:40001?minPoolSize=0", 0, false},
{"localhost:40001?minPoolSize=1", 1, false},
{"localhost:40001?minPoolSize=-1", -1, true},
{"localhost:40001?minPoolSize=-.", 0, true},
}
for _, test := range tests {
info, err := mgo.ParseURL(test.url)
if test.fail {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(info.MinPoolSize, Equals, test.size)
}
}
}

func (s *S) TestMaxIdleTimeMS(c *C) {
tests := []struct {
url string
size int
fail bool
}{
{"localhost:40001?maxIdleTimeMS=0", 0, false},
{"localhost:40001?maxIdleTimeMS=1", 1, false},
{"localhost:40001?maxIdleTimeMS=-1", -1, true},
{"localhost:40001?maxIdleTimeMS=-.", 0, true},
}
for _, test := range tests {
info, err := mgo.ParseURL(test.url)
if test.fail {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(info.MaxIdleTimeMS, Equals, test.size)
}
}
}

func (s *S) TestPoolShrink(c *C) {
if *fast {
c.Skip("-fast")
}
oldSocket := mgo.GetStats().SocketsAlive

session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000")
c.Assert(err, IsNil)
defer session.Close()

parallel := 10
res := make(chan error, parallel+1)
wg := &sync.WaitGroup{}
for i := 1; i < parallel; i++ {
wg.Add(1)
go func() {
s := session.Copy()
defer s.Close()
result := struct{}{}
err := s.Run("ping", &result)

//sleep random time to make the allocate and release in different sequence
time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond)
res <- err
wg.Done()
}()
}
wg.Wait()
stats := mgo.GetStats()
c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket)

// give some time for shrink the pool, the tick is set to 1 minute
c.Log("Sleeping... 1 minute to for pool shrinking")
time.Sleep(60 * time.Second)

stats = mgo.GetStats()
c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket)
c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false)
}

func (s *S) TestURLReadPreferenceTags(c *C) {
type test struct {
url string
Expand Down
1 change: 1 addition & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type mongoSocket struct {
dead error
serverInfo *mongoServerInfo
closeAfterIdle bool
lastTimeUsed time.Time // for time based idle socket release
sendMeta sync.Once
}

Expand Down

0 comments on commit 860240e

Please sign in to comment.