Skip to content

Commit

Permalink
make the releaser simpler
Browse files Browse the repository at this point in the history
Because the acquire and recycle are done at the tail of array,
the head is always the oldest unused socket.

Signed-off-by: Wang Xu <[email protected]>
  • Loading branch information
gnawux committed Jan 25, 2018
1 parent 6f7ed6c commit 7c60f0a
Showing 1 changed file with 34 additions and 39 deletions.
73 changes: 34 additions & 39 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize int, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
go server.pinger(true)
Expand Down Expand Up @@ -217,8 +217,8 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
if !server.closed {
now := time.Now()
server.unusedSockets = append(server.unusedSockets, &timedMongoSocket{
lastTimeUsed:&now,
soc:socket,
lastTimeUsed: &now,
soc: socket,
})
}
server.Unlock()
Expand Down Expand Up @@ -358,44 +358,39 @@ func (server *mongoServer) pinger(loop bool) {
}

func (server *mongoServer) releaser() {
for {

time.Sleep(1 * time.Minute)
ticker := time.NewTicker(1 * time.Minute)
for _ = range ticker.C {
if server.closed {
ticker.Stop()
return
}
server.RLock()
if len(server.unusedSockets) < server.minPoolSize {
server.RUnlock()
server.Lock()
unused := len(server.unusedSockets)
if unused < server.minPoolSize {
server.Unlock()
continue
}
tmpSlice := make([]*timedMongoSocket, 0, len(server.unusedSockets) - server.minPoolSize)
for _, s := range server.unusedSockets {
if len(tmpSlice) == cap(tmpSlice) {
now := time.Now()
end := 0
// Because the acquirision and recycle are done at the tail of array,
// the head is always the oldest unused socket.
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
break
}
if time.Since(*(s.lastTimeUsed)) > time.Duration(server.maxIdleTimeMS) * time.Millisecond {
tmpSlice = append(tmpSlice, s)
}
end++
}
tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*timedMongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
stats.conn(-1*end, server.info.Master)
}
server.RUnlock()
server.Unlock()

if len(tmpSlice) > 0 {
server.Lock()
for _, s := range tmpSlice {
for i, unused := range server.unusedSockets {
if s.soc == unused.soc {
copy(server.unusedSockets[i:], server.unusedSockets[i+1:])
n := len(server.unusedSockets) - 1
server.unusedSockets[n] = nil
server.unusedSockets = server.unusedSockets[:n]
stats.conn(-1, server.info.Master)
s.soc.Close()
break
}
}
}
server.Unlock()
for _, s := range tbr {
s.soc.Close()
}
}
}
Expand Down

0 comments on commit 7c60f0a

Please sign in to comment.