Skip to content

Commit

Permalink
api: make dialer mandatory
Browse files Browse the repository at this point in the history
This patch modifies `Connect` api.

Now, to connect to the Tarantool, you need to pass an
object that satisfies `tarantool.Dialer` interface.

You can use one of the existing implementations:
`NetDialer` or `OpenSslDialer`.

For example:
```
conn, err := tarantool.Connect(context.Background(), tarantool.NetDialer{
	Address:  "127.0.0.1:3301",
	User:     "user",
	Password: "secret",
}, tarantool.Opts{})
```

To create a connection pool, you need to pass a `map[string]tarantool.Dialer`,
where each dialer is associated with a unique ID (for example, it can be
the server address). Dialers will be distinguished from each other using
these IDs.

For example:
```
connPool, err := pool.Connect(context.Background(), map[string]tarantool.Dialer{
	"127.0.0.1": tarantool.NetDialer{
		Address:  "127.0.0.1",
		User:     "user",
		Password: "secret",
	},
}, tarantool.Opts{})
```

The `conn.RemoteAddr` and `conn.LocalAddr` functions have been removed.
To obtain the connection address, you can use `conn.Addr`.

Now, `NewWatcher` checks the actual features of the server, rather than relying
on the features provided by the user during connection creation.

In the case of connection pool, watchers are created for connections that support
this feature.

`ClientProtocolInfo`, `ServerProtocolInfo` were removed. Now, there is `ProtocolInfo`,
which returns the server protocol info.

`pool.GetPoolInfo` was renamed to `pool.GetInfo`. Return type changed to
`map[string]ConnectionInfo`.

Part of #321
  • Loading branch information
askalt committed Nov 24, 2023
1 parent a0ce232 commit f8f5ff8
Show file tree
Hide file tree
Showing 37 changed files with 1,861 additions and 1,691 deletions.
12 changes: 6 additions & 6 deletions box_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestErrorTypeMPEncodeDecode(t *testing.T) {
func TestErrorTypeEval(t *testing.T) {
test_helpers.SkipIfErrorMessagePackTypeUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

for name, testcase := range tupleCases {
Expand All @@ -318,7 +318,7 @@ func TestErrorTypeEval(t *testing.T) {
func TestErrorTypeEvalTyped(t *testing.T) {
test_helpers.SkipIfErrorMessagePackTypeUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

for name, testcase := range tupleCases {
Expand All @@ -336,7 +336,7 @@ func TestErrorTypeEvalTyped(t *testing.T) {
func TestErrorTypeInsert(t *testing.T) {
test_helpers.SkipIfErrorMessagePackTypeUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestErrorTypeInsert(t *testing.T) {
func TestErrorTypeInsertTyped(t *testing.T) {
test_helpers.SkipIfErrorMessagePackTypeUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
Expand Down Expand Up @@ -416,7 +416,7 @@ func TestErrorTypeInsertTyped(t *testing.T) {
func TestErrorTypeSelect(t *testing.T) {
test_helpers.SkipIfErrorMessagePackTypeUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
Expand Down Expand Up @@ -461,7 +461,7 @@ func TestErrorTypeSelect(t *testing.T) {
func TestErrorTypeSelectTyped(t *testing.T) {
test_helpers.SkipIfErrorMessagePackTypeUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

truncateEval := fmt.Sprintf("box.space[%q]:truncate()", space)
Expand Down
148 changes: 23 additions & 125 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"log"
"math"
"net"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -90,15 +91,15 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
reconnects := v[0].(uint)
err := v[1].(error)
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
reconnects, conn.opts.MaxReconnects, conn.addr, err)
reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
case LogLastReconnectFailed:
err := v[0].(error)
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.addr, err)
conn.Addr(), err)
case LogUnexpectedResultId:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.addr, resp.RequestId)
conn.Addr(), resp.RequestId)
case LogWatchEventReadFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
Expand Down Expand Up @@ -156,10 +157,11 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// More on graceful shutdown:
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c Conn
mutex sync.Mutex
cond *sync.Cond
addr net.Addr
dialer Dialer
c Conn
mutex sync.Mutex
cond *sync.Cond
// schemaResolver contains a SchemaResolver implementation.
schemaResolver SchemaResolver
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -260,11 +262,6 @@ const (

// Opts is a way to configure Connection
type Opts struct {
// Auth is an authentication method.
Auth Auth
// Dialer is a Dialer object used to create a new connection to a
// Tarantool instance. TtDialer is a default one.
Dialer Dialer
// Timeout for response to a particular request. The timeout is reset when
// push messages are received. If Timeout is zero, any request can be
// blocked infinitely.
Expand All @@ -287,10 +284,6 @@ type Opts struct {
// endlessly.
// After MaxReconnects attempts Connection becomes closed.
MaxReconnects uint
// Username for logging in to Tarantool.
User string
// User password for logging in to Tarantool.
Pass string
// RateLimit limits number of 'in-fly' request, i.e. already put into
// requests queue, but not yet answered by server or timeouted.
// It is disabled by default.
Expand All @@ -315,83 +308,23 @@ type Opts struct {
Handle interface{}
// Logger is user specified logger used for error messages.
Logger Logger
// Transport is the connection type, by default the connection is unencrypted.
Transport string
// SslOpts is used only if the Transport == 'ssl' is set.
Ssl SslOpts
// RequiredProtocolInfo contains minimal protocol version and
// list of protocol features that should be supported by
// Tarantool server. By default there are no restrictions.
RequiredProtocolInfo ProtocolInfo
}

// SslOpts is a way to configure ssl transport.
type SslOpts struct {
// KeyFile is a path to a private SSL key file.
KeyFile string
// CertFile is a path to an SSL certificate file.
CertFile string
// CaFile is a path to a trusted certificate authorities (CA) file.
CaFile string
// Ciphers is a colon-separated (:) list of SSL cipher suites the connection
// can use.
//
// We don't provide a list of supported ciphers. This is what OpenSSL
// does. The only limitation is usage of TLSv1.2 (because other protocol
// versions don't seem to support the GOST cipher). To add additional
// ciphers (GOST cipher), you must configure OpenSSL.
//
// See also
//
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
Ciphers string
// Password is a password for decrypting the private SSL key file.
// The priority is as follows: try to decrypt with Password, then
// try PasswordFile.
Password string
// PasswordFile is a path to the list of passwords for decrypting
// the private SSL key file. The connection tries every line from the
// file as a password.
PasswordFile string
}

// Clone returns a copy of the Opts object.
// Any changes in copy RequiredProtocolInfo will not affect the original
// RequiredProtocolInfo value.
func (opts Opts) Clone() Opts {
optsCopy := opts
optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()

return optsCopy
}

// Connect creates and configures a new Connection.
//
// Address could be specified in following ways:
//
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
//
// - Unix socket, first '/' or '.' indicates Unix socket
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
dialer: dialer,
requestId: 0,
contextRequestId: 1,
Greeting: &Greeting{},
control: make(chan struct{}),
opts: opts.Clone(),
opts: opts,
dec: msgpack.NewDecoder(&smallBuf{}),
}
maxprocs := uint32(runtime.GOMAXPROCS(-1))
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
conn.opts.Concurrency = maxprocs * 4
}
if conn.opts.Dialer == nil {
conn.opts.Dialer = TtDialer{}
}
if c := conn.opts.Concurrency; c&(c-1) != 0 {
for i := uint(1); i < 32; i *= 2 {
c |= c >> i
Expand Down Expand Up @@ -474,30 +407,10 @@ func (conn *Connection) CloseGraceful() error {
}

// Addr returns a configured address of Tarantool socket.
func (conn *Connection) Addr() string {
func (conn *Connection) Addr() net.Addr {
return conn.addr
}

// RemoteAddr returns an address of Tarantool socket.
func (conn *Connection) RemoteAddr() string {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if conn.c == nil {
return ""
}
return conn.c.RemoteAddr().String()
}

// LocalAddr returns an address of outgoing socket.
func (conn *Connection) LocalAddr() string {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if conn.c == nil {
return ""
}
return conn.c.LocalAddr().String()
}

// Handle returns a user-specified handle from Opts.
func (conn *Connection) Handle() interface{} {
return conn.opts.Handle
Expand All @@ -514,19 +427,14 @@ func (conn *Connection) dial(ctx context.Context) error {
opts := conn.opts

var c Conn
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
IoTimeout: opts.Timeout,
Transport: opts.Transport,
Ssl: opts.Ssl,
RequiredProtocol: opts.RequiredProtocolInfo,
Auth: opts.Auth,
User: opts.User,
Password: opts.Pass,
c, err := conn.dialer.Dial(ctx, DialOpts{
IoTimeout: opts.Timeout,
})
if err != nil {
return err
}

conn.addr = c.Addr()
conn.Greeting.Version = c.Greeting().Version
conn.serverProtocolInfo = c.ProtocolInfo()

Expand Down Expand Up @@ -1453,8 +1361,7 @@ func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) boo

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require IPROTO_FEATURE_WATCHERS to use watchers, see examples
// for the function.
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
//
// After watcher creation, the watcher callback is invoked for the first time.
// In this case, the callback is triggered whether or not the key has already
Expand Down Expand Up @@ -1490,9 +1397,9 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// That's why we can't just check the Tarantool response for an unsupported
// request error.
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
conn.opts.RequiredProtocolInfo.Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
conn.c.ProtocolInfo().Features) {
err := fmt.Errorf("the feature %s must be supported by connection "+
"to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
return nil, err
}

Expand Down Expand Up @@ -1583,23 +1490,14 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
}, nil
}

// ServerProtocolVersion returns protocol version and protocol features
// ProtocolInfo returns protocol version and protocol features
// supported by connected Tarantool server. Beware that values might be
// outdated if connection is in a disconnected state.
// Since 1.10.0
func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
// Since 2.0.0
func (conn *Connection) ProtocolInfo() ProtocolInfo {
return conn.serverProtocolInfo.Clone()
}

// ClientProtocolVersion returns protocol version and protocol features
// supported by Go connection client.
// Since 1.10.0
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
info := clientProtocolInfo.Clone()
info.Auth = conn.opts.Auth
return info
}

func shutdownEventCallback(event WatchEvent) {
// Receives "true" on server shutdown.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
Expand Down
32 changes: 0 additions & 32 deletions connection_test.go

This file was deleted.

10 changes: 7 additions & 3 deletions crud/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ const (

var exampleOpts = tarantool.Opts{
Timeout: 5 * time.Second,
User: "test",
Pass: "test",
}

var exampleDialer = tarantool.NetDialer{
Address: exampleServer,
User: "test",
Password: "test",
}

func exampleConnect() *tarantool.Connection {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, exampleServer, exampleOpts)
conn, err := tarantool.Connect(ctx, exampleDialer, exampleOpts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
Expand Down
Loading

0 comments on commit f8f5ff8

Please sign in to comment.