Skip to content

Commit

Permalink
connection: make dialer mandatory
Browse files Browse the repository at this point in the history
This patch modifies `Connect` api.

Now, to connect to the Tarantool, you need to pass an
object that satisfies `tarantool.Dialer` interface.

You can use one of the existing implementations:
`TtDialer` or `OpenSslDialer`.

For example:
```
conn, err := tarantool.Connect(context.Background(), tarantool.TtDialer{
	Address:  "127.0.0.1:3301",
	User:     "user",
	Password: "secret",
}, tarantool.Opts{})
```

To create a connection pool, you need to pass a `map[string]tarantool.Dialer`,
where each dialer is associated with a unique ID (for example, it can be
the server address). Connections will be distinguished from each other using
these IDs.

For example:
```
connPool, err := pool.Connect(context.Background(), map[string]tarantool.Dialer{
	"127.0.0.1": tarantool.TtDialer{
		Address:  "127.0.0.1",
		User:     "user",
		Password: "secret",
	},
}, tarantool.Opts{})
```

The `conn.RemoteAddr` and `conn.LocalAddr` functions have been removed.
To obtain the connection address, you can use `conn.Addr`. This function
panics if the connection has not been established.

Now, `NewWatcher` checks the actual features of the server, rather than relying
on the features provided by the user during connection creation.

In the case of connection pool, watchers are created for connections that support
this feature.

`ClientProtocolInfo`, `ServerProtocolInfo` were removed. Now, there is `ProtocolInfo`,
which returns the server protocol info.

Part of #321
  • Loading branch information
askalt committed Nov 17, 2023
1 parent 8d4fedd commit c468873
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 449 deletions.
151 changes: 23 additions & 128 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"log"
"math"
"net"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -89,16 +90,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
case LogReconnectFailed:
reconnects := v[0].(uint)
err := v[1].(error)
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
reconnects, conn.opts.MaxReconnects, conn.addr, err)
log.Printf("tarantool: reconnect (%d/%d) failed: %s",
reconnects, conn.opts.MaxReconnects, err)
case LogLastReconnectFailed:
err := v[0].(error)
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.addr, err)
log.Printf("tarantool: last reconnect failed: %s, giving it up", err)
case LogUnexpectedResultId:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.addr, resp.RequestId)
log.Printf("tarantool: got unexpected resultId (%d) in response", resp.RequestId)
case LogWatchEventReadFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
Expand Down Expand Up @@ -156,10 +155,10 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// More on graceful shutdown:
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c Conn
mutex sync.Mutex
cond *sync.Cond
dialer Dialer
c Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// schemaResolver contains a SchemaResolver implementation.
Expand Down Expand Up @@ -262,11 +261,6 @@ const (

// Opts is a way to configure Connection
type Opts struct {
// Auth is an authentication method.
Auth Auth
// Dialer is a Dialer object used to create a new connection to a
// Tarantool instance. TtDialer is a default one.
Dialer Dialer
// Timeout for response to a particular request. The timeout is reset when
// push messages are received. If Timeout is zero, any request can be
// blocked infinitely.
Expand All @@ -289,10 +283,6 @@ type Opts struct {
// endlessly.
// After MaxReconnects attempts Connection becomes closed.
MaxReconnects uint
// Username for logging in to Tarantool.
User string
// User password for logging in to Tarantool.
Pass string
// RateLimit limits number of 'in-fly' request, i.e. already put into
// requests queue, but not yet answered by server or timeouted.
// It is disabled by default.
Expand All @@ -317,83 +307,23 @@ type Opts struct {
Handle interface{}
// Logger is user specified logger used for error messages.
Logger Logger
// Transport is the connection type, by default the connection is unencrypted.
Transport string
// SslOpts is used only if the Transport == 'ssl' is set.
Ssl SslOpts
// RequiredProtocolInfo contains minimal protocol version and
// list of protocol features that should be supported by
// Tarantool server. By default there are no restrictions.
RequiredProtocolInfo ProtocolInfo
}

// SslOpts is a way to configure ssl transport.
type SslOpts struct {
// KeyFile is a path to a private SSL key file.
KeyFile string
// CertFile is a path to an SSL certificate file.
CertFile string
// CaFile is a path to a trusted certificate authorities (CA) file.
CaFile string
// Ciphers is a colon-separated (:) list of SSL cipher suites the connection
// can use.
//
// We don't provide a list of supported ciphers. This is what OpenSSL
// does. The only limitation is usage of TLSv1.2 (because other protocol
// versions don't seem to support the GOST cipher). To add additional
// ciphers (GOST cipher), you must configure OpenSSL.
//
// See also
//
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
Ciphers string
// Password is a password for decrypting the private SSL key file.
// The priority is as follows: try to decrypt with Password, then
// try PasswordFile.
Password string
// PasswordFile is a path to the list of passwords for decrypting
// the private SSL key file. The connection tries every line from the
// file as a password.
PasswordFile string
}

// Clone returns a copy of the Opts object.
// Any changes in copy RequiredProtocolInfo will not affect the original
// RequiredProtocolInfo value.
func (opts Opts) Clone() Opts {
optsCopy := opts
optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()

return optsCopy
}

// Connect creates and configures a new Connection.
//
// Address could be specified in following ways:
//
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
//
// - Unix socket, first '/' or '.' indicates Unix socket
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
dialer: dialer,
requestId: 0,
contextRequestId: 1,
Greeting: &Greeting{},
control: make(chan struct{}),
opts: opts.Clone(),
opts: opts,
dec: msgpack.NewDecoder(&smallBuf{}),
}
maxprocs := uint32(runtime.GOMAXPROCS(-1))
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
conn.opts.Concurrency = maxprocs * 4
}
if conn.opts.Dialer == nil {
conn.opts.Dialer = TtDialer{}
}
if c := conn.opts.Concurrency; c&(c-1) != 0 {
for i := uint(1); i < 32; i *= 2 {
c |= c >> i
Expand Down Expand Up @@ -474,28 +404,9 @@ func (conn *Connection) CloseGraceful() error {
}

// Addr returns a configured address of Tarantool socket.
func (conn *Connection) Addr() string {
return conn.addr
}

// RemoteAddr returns an address of Tarantool socket.
func (conn *Connection) RemoteAddr() string {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if conn.c == nil {
return ""
}
return conn.c.RemoteAddr().String()
}

// LocalAddr returns an address of outgoing socket.
func (conn *Connection) LocalAddr() string {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if conn.c == nil {
return ""
}
return conn.c.LocalAddr().String()
// It panics, if the connection has not been successfully established.
func (conn *Connection) Addr() net.Addr {
return conn.c.Addr()
}

// Handle returns a user-specified handle from Opts.
Expand All @@ -514,14 +425,8 @@ func (conn *Connection) dial(ctx context.Context) error {
opts := conn.opts

var c Conn
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
IoTimeout: opts.Timeout,
Transport: opts.Transport,
Ssl: opts.Ssl,
RequiredProtocol: opts.RequiredProtocolInfo,
Auth: opts.Auth,
User: opts.User,
Password: opts.Pass,
c, err := conn.dialer.Dial(ctx, DialOpts{
IoTimeout: opts.Timeout,
})
if err != nil {
return err
Expand Down Expand Up @@ -1447,8 +1352,7 @@ func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) boo

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require IPROTO_FEATURE_WATCHERS to use watchers, see examples
// for the function.
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
//
// After watcher creation, the watcher callback is invoked for the first time.
// In this case, the callback is triggered whether or not the key has already
Expand Down Expand Up @@ -1484,9 +1388,9 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// That's why we can't just check the Tarantool response for an unsupported
// request error.
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
conn.opts.RequiredProtocolInfo.Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
conn.c.ProtocolInfo().Features) {
err := fmt.Errorf("the feature %s must be supported by connection "+
"to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
return nil, err
}

Expand Down Expand Up @@ -1577,23 +1481,14 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
}, nil
}

// ServerProtocolVersion returns protocol version and protocol features
// ProtocolInfo returns protocol version and protocol features
// supported by connected Tarantool server. Beware that values might be
// outdated if connection is in a disconnected state.
// Since 1.10.0
func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
func (conn *Connection) ProtocolInfo() ProtocolInfo {
return conn.serverProtocolInfo.Clone()
}

// ClientProtocolVersion returns protocol version and protocol features
// supported by Go connection client.
// Since 1.10.0
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
info := clientProtocolInfo.Clone()
info.Auth = conn.opts.Auth
return info
}

func shutdownEventCallback(event WatchEvent) {
// Receives "true" on server shutdown.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
Expand Down
Loading

0 comments on commit c468873

Please sign in to comment.