Skip to content

Commit

Permalink
refactor: replace buraksezer/pool with buraksezer/connpool
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 16, 2021
1 parent b5a0f44 commit bdcce47
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/buraksezer/olric
go 1.15

require (
github.com/buraksezer/connpool v0.4.0
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/buraksezer/pool v3.0.0+incompatible
github.com/cespare/xxhash v1.1.0
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/hashicorp/go-multierror v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/buraksezer/connpool v0.4.0 h1:fNLvWu0FOtJxL7Sqetm6+040mhZWVs8c6vrke14SEKw=
github.com/buraksezer/connpool v0.4.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0=
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
github.com/buraksezer/pool v3.0.0+incompatible h1:MXcI3YkBnElnbJ8ZPIYqa0dia3qqHDNto1E004YxseA=
Expand Down
29 changes: 16 additions & 13 deletions internal/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package transport

import (
"context"
"fmt"
"net"
"os"
"sync"

"github.com/buraksezer/connpool"
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/pool"
)

// Client is the client implementation for the internal TCP server.
Expand All @@ -32,7 +33,7 @@ type Client struct {

dialer *net.Dialer
config *config.Client
pools map[string]pool.Pool
pools map[string]connpool.Pool
}

// NewClient returns a new Client.
Expand All @@ -49,7 +50,7 @@ func NewClient(cc *config.Client) *Client {
c := &Client{
dialer: dialer,
config: cc,
pools: make(map[string]pool.Pool),
pools: make(map[string]connpool.Pool),
}
return c
}
Expand All @@ -62,7 +63,7 @@ func (c *Client) Close() {
p.Close()
}
// Reset pool
c.pools = make(map[string]pool.Pool)
c.pools = make(map[string]connpool.Pool)
}

// ClosePool closes the underlying connections in a pool,
Expand All @@ -81,34 +82,36 @@ func (c *Client) ClosePool(addr string) {
}

// pool creates a new pool for a given addr or returns an exiting one.
func (c *Client) pool(addr string) (pool.Pool, error) {
func (c *Client) pool(addr string) (connpool.Pool, error) {
factory := func() (net.Conn, error) {
return c.dialer.Dial("tcp", addr)
}

c.mu.Lock()
defer c.mu.Unlock()

cpool, ok := c.pools[addr]
p, ok := c.pools[addr]
if ok {
return cpool, nil
return p, nil
}

cpool, err := pool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory)
p, err := connpool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory)
if err != nil {
return nil, err
}
c.pools[addr] = cpool
return cpool, nil
c.pools[addr] = p
return p, nil
}

func (c *Client) conn(addr string) (net.Conn, error) {
cpool, err := c.pool(addr)
p, err := c.pool(addr)
if err != nil {
return nil, err
}

conn, err := cpool.Get()
// Use context.Background here because we dont want to change the default
// behaviour.
conn, err := p.Get(context.Background())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,7 +141,7 @@ func (c *Client) teardownConn(rawConn net.Conn, dead bool) {
c.teardownConnWithTimeout(rawConn.(*ConnWithTimeout), dead)
return
}
pc, _ := rawConn.(*pool.PoolConn)
pc, _ := rawConn.(*connpool.PoolConn)
pc.MarkUnusable()
err := pc.Close()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/transport/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"io"
"os"

"github.com/buraksezer/connpool"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/pool"
)

func readFromStream(conn io.ReadWriteCloser, bufCh chan<- protocol.EncodeDecoder, errCh chan<- error) {
Expand Down Expand Up @@ -62,14 +62,14 @@ func (c *Client) CreateStream(ctx context.Context, addr string, read chan<- prot
return err
}

conn, err := p.Get()
conn, err := p.Get(ctx)
if err != nil {
return err
}

defer func() {
// marks the connection not usable any more, to let the pool close it instead of returning it to pool.
pc, _ := conn.(*pool.PoolConn)
pc, _ := conn.(*connpool.PoolConn)
pc.MarkUnusable()
if err = pc.Close(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to close connection: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions internal/transport/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"net"
"time"

"github.com/buraksezer/pool"
"github.com/buraksezer/connpool"
)

// ConnWithTimeout denotes a composite type which can used to implement i/o timeout feature for TCP sockets.
Expand Down Expand Up @@ -60,17 +60,17 @@ func (c *ConnWithTimeout) UnsetDeadline() error {
return c.Conn.SetDeadline(time.Time{})
}

// MarkUnusable() marks the connection not usable any more, to let the pool close it instead of returning it to pool.
// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
// Wrapper around pool.PoolConn.MarkUnusable
func (c *ConnWithTimeout) MarkUnusable() {
if conn, ok := c.Conn.(*pool.PoolConn); ok {
if conn, ok := c.Conn.(*connpool.PoolConn); ok {
conn.MarkUnusable()
}
}

// Close closes the connection.
func (c *ConnWithTimeout) Close() error {
conn, ok := c.Conn.(*pool.PoolConn)
conn, ok := c.Conn.(*connpool.PoolConn)
if ok {
return conn.Close()
}
Expand Down

0 comments on commit bdcce47

Please sign in to comment.