Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable shrink the socket pool size #116

Merged
merged 2 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,23 @@ import (

type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
minPoolSize int
maxIdleTimeMS int
}

func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
Expand Down Expand Up @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() {
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
cluster.RLock()
server := cluster.servers.Search(tcpaddr.String())
minPoolSize := cluster.minPoolSize
maxIdleTimeMS := cluster.maxIdleTimeMS
cluster.RUnlock()
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
}

func resolveAddr(addr string) (*net.TCPAddr, error) {
Expand Down
71 changes: 63 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type mongoServer struct {
pingCount uint32
closed bool
abended bool
minPoolSize int
maxIdleTimeMS int
}

type dialer struct {
Expand All @@ -76,17 +78,22 @@ type mongoServerInfo struct {

var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
go server.pinger(true)
if maxIdleTimeMS != 0 {
go server.poolShrinker()
}
return server
}

Expand Down Expand Up @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) {
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
server.Lock()
if !server.closed {
socket.lastTimeUsed = time.Now()
server.unusedSockets = append(server.unusedSockets, socket)
}
server.Unlock()
Expand Down Expand Up @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) {
}
}

func (server *mongoServer) poolShrinker() {
ticker := time.NewTicker(1 * time.Minute)
for _ = range ticker.C {
if server.closed {
ticker.Stop()
return
}
server.Lock()
unused := len(server.unusedSockets)
if unused < server.minPoolSize {
server.Unlock()
continue
}
now := time.Now()
end := 0
reclaimMap := map[*mongoSocket]struct{}{}
// Because the acquisition and recycle are done at the tail of array,
// the head is always the oldest unused socket.
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
break
}
end++
reclaimMap[s] = struct{}{}
}
tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*mongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
remainSockets := []*mongoSocket{}
for _, s := range server.liveSockets {
if _, ok := reclaimMap[s]; !ok {
remainSockets = append(remainSockets, s)
}
}
server.liveSockets = remainSockets
stats.conn(-1*end, server.info.Master)
}
server.Unlock()

for _, s := range tbr {
s.Close()
}
}
}

type mongoServerSlice []*mongoServer

func (s mongoServerSlice) Len() int {
Expand Down
42 changes: 42 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ const (
// Defines the per-server socket pool limit. Defaults to 4096.
// See Session.SetPoolLimit for details.
//
// minPoolSize=<limit>
//
// Defines the per-server socket pool minium size. Defaults to 0.
//
// maxIdleTimeMS=<millisecond>
//
// The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed. If maxIdleTimeMS is 0, connections will never be
// closed due to inactivity.
//
// appName=<appName>
//
// The identifier of this client application. This parameter is used to
Expand Down Expand Up @@ -322,6 +332,8 @@ func ParseURL(url string) (*DialInfo, error) {
appName := ""
readPreferenceMode := Primary
var readPreferenceTagSets []bson.D
minPoolSize := 0
maxIdleTimeMS := 0
for _, opt := range uinfo.options {
switch opt.key {
case "authSource":
Expand Down Expand Up @@ -368,6 +380,22 @@ func ParseURL(url string) (*DialInfo, error) {
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
}
readPreferenceTagSets = append(readPreferenceTagSets, doc)
case "minPoolSize":
minPoolSize, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for minPoolSize: " + opt.value)
}
if minPoolSize < 0 {
return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value)
}
case "maxIdleTimeMS":
maxIdleTimeMS, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
}
if maxIdleTimeMS < 0 {
return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value)
}
case "connect":
if opt.value == "direct" {
direct = true
Expand Down Expand Up @@ -402,6 +430,8 @@ func ParseURL(url string) (*DialInfo, error) {
TagSets: readPreferenceTagSets,
},
ReplicaSetName: setName,
MinPoolSize: minPoolSize,
MaxIdleTimeMS: maxIdleTimeMS,
}
return &info, nil
}
Expand Down Expand Up @@ -475,6 +505,14 @@ type DialInfo struct {
// cluster and establish connections with further servers too.
Direct bool

// MinPoolSize defines The minimum number of connections in the connection pool.
// Defaults to 0.
MinPoolSize int

//The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed.
MaxIdleTimeMS int

// DialServer optionally specifies the dial function for establishing
// connections with the MongoDB servers.
DialServer func(addr *ServerAddr) (net.Conn, error)
Expand Down Expand Up @@ -554,6 +592,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
if info.PoolLimit > 0 {
session.poolLimit = info.PoolLimit
}

cluster.minPoolSize = info.MinPoolSize
cluster.maxIdleTimeMS = info.MaxIdleTimeMS

cluster.Release()

// People get confused when we return a session that is not actually
Expand Down
86 changes: 86 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"flag"
"fmt"
"math"
"math/rand"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) {
}
}

func (s *S) TestMinPoolSize(c *C) {
tests := []struct {
url string
size int
fail bool
}{
{"localhost:40001?minPoolSize=0", 0, false},
{"localhost:40001?minPoolSize=1", 1, false},
{"localhost:40001?minPoolSize=-1", -1, true},
{"localhost:40001?minPoolSize=-.", 0, true},
}
for _, test := range tests {
info, err := mgo.ParseURL(test.url)
if test.fail {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(info.MinPoolSize, Equals, test.size)
}
}
}

func (s *S) TestMaxIdleTimeMS(c *C) {
tests := []struct {
url string
size int
fail bool
}{
{"localhost:40001?maxIdleTimeMS=0", 0, false},
{"localhost:40001?maxIdleTimeMS=1", 1, false},
{"localhost:40001?maxIdleTimeMS=-1", -1, true},
{"localhost:40001?maxIdleTimeMS=-.", 0, true},
}
for _, test := range tests {
info, err := mgo.ParseURL(test.url)
if test.fail {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(info.MaxIdleTimeMS, Equals, test.size)
}
}
}

func (s *S) TestPoolShrink(c *C) {
if *fast {
c.Skip("-fast")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!

}
oldSocket := mgo.GetStats().SocketsAlive

session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000")
c.Assert(err, IsNil)
defer session.Close()

parallel := 10
res := make(chan error, parallel+1)
wg := &sync.WaitGroup{}
for i := 1; i < parallel; i++ {
wg.Add(1)
go func() {
s := session.Copy()
defer s.Close()
result := struct{}{}
err := s.Run("ping", &result)

//sleep random time to make the allocate and release in different sequence
time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond)
res <- err
wg.Done()
}()
}
wg.Wait()
stats := mgo.GetStats()
c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket)

// give some time for shrink the pool, the tick is set to 1 minute
c.Log("Sleeping... 1 minute to for pool shrinking")
time.Sleep(60 * time.Second)

stats = mgo.GetStats()
c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket)
c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false)
}

func (s *S) TestURLReadPreferenceTags(c *C) {
type test struct {
url string
Expand Down
1 change: 1 addition & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type mongoSocket struct {
dead error
serverInfo *mongoServerInfo
closeAfterIdle bool
lastTimeUsed time.Time // for time based idle socket release
sendMeta sync.Once
}

Expand Down