Skip to content

Commit

Permalink
streams: interactive transactions and support
Browse files Browse the repository at this point in the history
The main purpose of streams is transactions via iproto.
Since v. 2.10.0, Tarantool supports streams and
interactive transactions over them. Each stream
can start its own transaction, so they allows
multiplexing several transactions over one connection.

API for this feature is the following:

* `NewStream()` method to create a stream object for `Connection`
   and `NewStream(userMode Mode)` method to create a stream object
   for `ConnectionPool`
*  stream object `Stream` with `Do()`, `Begin()`,
   `Commit()`, `Rollback()` methods,
   `Begin()` - start transaction via iproto stream;
   `Commit()` - commit transaction;
   `Rollback()` - rollback transaction.

Closes #101
  • Loading branch information
AnaNek committed Jul 4, 2022
1 parent 9b0ec8a commit 3ce90ac
Show file tree
Hide file tree
Showing 9 changed files with 786 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Public API with request object types (#126)
- Support decimal type in msgpack (#96)
- Support datetime type in msgpack (#118)
- Streams and interactive transactions support (#101)

### Changed

Expand Down
34 changes: 26 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

const requestsMap = 128
const defaultStreamId = 0
const (
connDisconnected = 0
connConnected = 1
Expand Down Expand Up @@ -139,6 +140,8 @@ type Connection struct {
state uint32
dec *msgpack.Decoder
lenbuf [PacketLengthBytes]byte

lastStreamId uint32
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -468,15 +471,17 @@ func (conn *Connection) dial() (err error) {
return
}

func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
req Request, streamId uint32, res SchemaResolver) (err error) {
hl := h.Len()
h.Write([]byte{
0xce, 0, 0, 0, 0, // Length.
0x82, // 2 element map.
0x83, // 3 element map.
KeyCode, byte(req.Code()), // Request code.
KeySync, 0xce,
byte(reqid >> 24), byte(reqid >> 16),
byte(reqid >> 8), byte(reqid),
KeyStreamId, byte(streamId),
})

if err = req.Body(res, enc); err != nil {
Expand All @@ -495,7 +500,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
var packet smallWBuf
req := newAuthRequest(conn.opts.User, string(scramble))
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, defaultStreamId, conn.Schema)

if err != nil {
return errors.New("auth: pack error " + err.Error())
Expand Down Expand Up @@ -785,16 +790,16 @@ func (conn *Connection) newFuture() (fut *Future) {
return
}

func (conn *Connection) send(req Request) *Future {
func (conn *Connection) send(req Request, streamId uint32) *Future {
fut := conn.newFuture()
if fut.ready == nil {
return fut
}
conn.putFuture(fut, req)
conn.putFuture(fut, req, streamId)
return fut
}

func (conn *Connection) putFuture(fut *Future, req Request) {
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint32) {
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.bufmut.Lock()
Expand All @@ -811,7 +816,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
}
blen := shard.buf.Len()
reqid := fut.requestId
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
shard.buf.Trunc(blen)
shard.bufmut.Unlock()
if f := conn.fetchFuture(reqid); f == fut {
Expand Down Expand Up @@ -993,7 +998,7 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
// An error is returned if the request was formed incorrectly, or failed to
// create the future.
func (conn *Connection) Do(req Request) *Future {
return conn.send(req)
return conn.send(req, defaultStreamId)
}

// ConfiguredTimeout returns a timeout from connection config.
Expand All @@ -1009,3 +1014,16 @@ func (conn *Connection) OverrideSchema(s *Schema) {
conn.Schema = s
}
}

// NewStream creates new Stream object for connection.
//
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
// Since 1.7.0
func (conn *Connection) NewStream() *Stream {
conn.lastStreamId += 1
return &Stream{
Id: conn.lastStreamId,
Conn: conn,
}
}
14 changes: 14 additions & 0 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant
return conn.Do(req)
}

// NewStream creates new Stream object for connection selected
// by userMode from connPool.
//
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
// Since 1.7.0
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return nil, err
}
return conn.NewStream(), nil
}

//
// private
//
Expand Down
223 changes: 217 additions & 6 deletions connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var defaultTimeoutRetry = 500 * time.Millisecond

var instances []test_helpers.TarantoolInstance

var tarantoolVersionIsLess bool

func TestConnError_IncorrectParams(t *testing.T) {
connPool, err := connection_pool.Connect([]string{}, tarantool.Opts{})
require.Nilf(t, connPool, "conn is not nil with incorrect param")
Expand Down Expand Up @@ -1276,6 +1278,203 @@ func TestDo(t *testing.T) {
require.NotNilf(t, resp, "response is nil after Ping")
}

func TestStream_Commit(t *testing.T) {
var req tarantool.Request
var resp *tarantool.Response
var err error

// Tarantool supports streams and interactive transactions since version 2.10.0
if tarantoolVersionIsLess {
t.Skip()
}

roles := []bool{true, true, false, true, true}

err = test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := connection_pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")
defer connPool.Close()

stream, err := connPool.NewStream(connection_pool.PreferRW)
require.Nilf(t, err, "failed to create stream")
require.NotNilf(t, connPool, "stream is nil after NewStream")

// Begin transaction
resp, err = stream.Begin()
require.Nilf(t, err, "failed to Begin")
require.NotNilf(t, resp, "response is nil after Begin")
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned")

// Insert in stream
req = tarantool.NewInsertRequest(spaceName).
Tuple([]interface{}{"commit_key", "commit_value"})
resp, err = stream.Do(req).Get()
require.Nilf(t, err, "failed to Insert")
require.NotNilf(t, resp, "response is nil after Insert")
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned")

// Connect to servers[2] to check if tuple
// was inserted outside of stream on RW instance
// before transaction commit
conn, err := tarantool.Connect(servers[2], connOpts)
require.Nilf(t, err, "failed to connect %s", servers[2])
require.NotNilf(t, conn, "conn is nil after Connect")

// Select not related to the transaction
// while transaction is not committed
// result of select is empty
req = tarantool.NewSelectRequest(spaceNo).
Index(indexNo).
Offset(0).
Limit(1).
Iterator(tarantool.IterEq).
Key([]interface{}{"commit_key"})
resp, err = conn.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")

// Select in stream
resp, err = stream.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")

tpl, ok := resp.Data[0].([]interface{})
require.Truef(t, ok, "unexpected body of Select")
require.Equalf(t, 2, len(tpl), "unexpected body of Select")

key, ok := tpl[0].(string)
require.Truef(t, ok, "unexpected body of Select (0)")
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")

value, ok := tpl[1].(string)
require.Truef(t, ok, "unexpected body of Select (1)")
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")

// Commit transaction
resp, err = stream.Commit()
require.Nilf(t, err, "failed to Commit")
require.NotNilf(t, resp, "response is nil after Commit")
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Commit: wrong code returned")

// Select outside of transaction
resp, err = conn.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select")

tpl, ok = resp.Data[0].([]interface{})
require.Truef(t, ok, "unexpected body of Select")
require.Equalf(t, 2, len(tpl), "unexpected body of Select")

key, ok = tpl[0].(string)
require.Truef(t, ok, "unexpected body of Select (0)")
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")

value, ok = tpl[1].(string)
require.Truef(t, ok, "unexpected body of Select (1)")
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
}

func TestStream_Rollback(t *testing.T) {
var req tarantool.Request
var resp *tarantool.Response
var err error

// Tarantool supports streams and interactive transactions since version 2.10.0
if tarantoolVersionIsLess {
t.Skip()
}

roles := []bool{true, true, false, true, true}

err = test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := connection_pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")
defer connPool.Close()

stream, err := connPool.NewStream(connection_pool.PreferRW)
require.Nilf(t, err, "failed to create stream")
require.NotNilf(t, connPool, "stream is nil after NewStream")

// Begin transaction
resp, err = stream.Begin()
require.Nilf(t, err, "failed to Begin")
require.NotNilf(t, resp, "response is nil after Begin")
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned")

// Insert in stream
req = tarantool.NewInsertRequest(spaceName).
Tuple([]interface{}{"rollback_key", "rollback_value"})
resp, err = stream.Do(req).Get()
require.Nilf(t, err, "failed to Insert")
require.NotNilf(t, resp, "response is nil after Insert")
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned")

// Connect to servers[2] to check if tuple
// was not inserted outside of stream on RW instance
conn, err := tarantool.Connect(servers[2], connOpts)
require.Nilf(t, err, "failed to connect %s", servers[2])
require.NotNilf(t, conn, "conn is nil after Connect")

// Select not related to the transaction
// while transaction is not committed
// result of select is empty
req = tarantool.NewSelectRequest(spaceNo).
Index(indexNo).
Offset(0).
Limit(1).
Iterator(tarantool.IterEq).
Key([]interface{}{"rollback_key"})
resp, err = conn.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")

// Select in stream
resp, err = stream.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")

tpl, ok := resp.Data[0].([]interface{})
require.Truef(t, ok, "unexpected body of Select")
require.Equalf(t, 2, len(tpl), "unexpected body of Select")

key, ok := tpl[0].(string)
require.Truef(t, ok, "unexpected body of Select (0)")
require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)")

value, ok := tpl[1].(string)
require.Truef(t, ok, "unexpected body of Select (1)")
require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)")

// Rollback transaction
resp, err = stream.Rollback()
require.Nilf(t, err, "failed to Rollback")
require.NotNilf(t, resp, "response is nil after Rollback")
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Rollback: wrong code returned")

// Select outside of transaction
resp, err = conn.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select")

// Select inside of stream after rollback
resp, err = stream.Do(req).Get()
require.Nilf(t, err, "failed to Select")
require.NotNilf(t, resp, "response is nil after Select")
require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select")
}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
Expand All @@ -1292,13 +1491,25 @@ func runTestMain(m *testing.M) int {
"work_dir5"}
var err error

memtxUseMvccEngine := true

// Tarantool supports streams and interactive transactions since version 2.10.0
tarantoolVersionIsLess, err = test_helpers.IsTarantoolVersionLess(2, 10, 0)
if err != nil {
log.Fatalf("Could not check the Tarantool version")
}
if tarantoolVersionIsLess {
memtxUseMvccEngine = false
}

instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{
InitScript: initScript,
User: connOpts.User,
Pass: connOpts.Pass,
WaitStart: waitStart,
ConnectRetry: connectRetry,
RetryTimeout: retryTimeout,
InitScript: initScript,
User: connOpts.User,
Pass: connOpts.Pass,
WaitStart: waitStart,
ConnectRetry: connectRetry,
RetryTimeout: retryTimeout,
MemtxUseMvccEngine: memtxUseMvccEngine,
})

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ const (
UpsertRequestCode = 9
Call17RequestCode = 10 /* call in >= 1.7 format */
ExecuteRequestCode = 11
BeginRequestCode = 14
CommitRequestCode = 15
RollbackRequestCode = 16
PingRequestCode = 64
SubscribeRequestCode = 66

KeyCode = 0x00
KeySync = 0x01
KeyStreamId = 0x0a
KeySpaceNo = 0x10
KeyIndexNo = 0x11
KeyLimit = 0x12
Expand Down
Loading

0 comments on commit 3ce90ac

Please sign in to comment.