diff --git a/CHANGELOG.md b/CHANGELOG.md index ad693e0f0..6eb7b550b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support decimal type in msgpack (#96) - Support datetime type in msgpack (#118) - Prepared SQL statements (#117) +- Streams and interactive transactions support (#101) ### Changed diff --git a/connection.go b/connection.go index 6de1e9d01..964e8216f 100644 --- a/connection.go +++ b/connection.go @@ -19,6 +19,7 @@ import ( ) const requestsMap = 128 +const ignoreStreamId = 0 const ( connDisconnected = 0 connConnected = 1 @@ -139,6 +140,8 @@ type Connection struct { state uint32 dec *msgpack.Decoder lenbuf [PacketLengthBytes]byte + + lastStreamId uint64 } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -468,16 +471,27 @@ func (conn *Connection) dial() (err error) { return } -func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) { +func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, + req Request, streamId uint64, res SchemaResolver) (err error) { hl := h.Len() - h.Write([]byte{ + + hMapLen := byte(0x82) // 2 element map. + if streamId != ignoreStreamId { + hMapLen = byte(0x83) // 3 element map. + } + hBytes := []byte{ 0xce, 0, 0, 0, 0, // Length. - 0x82, // 2 element map. + hMapLen, KeyCode, byte(req.Code()), // Request code. KeySync, 0xce, byte(reqid >> 24), byte(reqid >> 16), byte(reqid >> 8), byte(reqid), - }) + } + if streamId != ignoreStreamId { + hBytes = append(hBytes, KeyStreamId, byte(streamId)) + } + + h.Write(hBytes) if err = req.Body(res, enc); err != nil { return @@ -495,7 +509,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) { var packet smallWBuf req := newAuthRequest(conn.opts.User, string(scramble)) - err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema) + err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, ignoreStreamId, conn.Schema) if err != nil { return errors.New("auth: pack error " + err.Error()) @@ -785,16 +799,16 @@ func (conn *Connection) newFuture() (fut *Future) { return } -func (conn *Connection) send(req Request) *Future { +func (conn *Connection) send(req Request, streamId uint64) *Future { fut := conn.newFuture() if fut.ready == nil { return fut } - conn.putFuture(fut, req) + conn.putFuture(fut, req, streamId) return fut } -func (conn *Connection) putFuture(fut *Future, req Request) { +func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.bufmut.Lock() @@ -811,7 +825,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) { } blen := shard.buf.Len() reqid := fut.requestId - if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil { + if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil { shard.buf.Trunc(blen) shard.bufmut.Unlock() if f := conn.fetchFuture(reqid); f == fut { @@ -1000,7 +1014,7 @@ func (conn *Connection) Do(req Request) *Future { return fut } } - return conn.send(req) + return conn.send(req, ignoreStreamId) } // ConfiguredTimeout returns a timeout from connection config. @@ -1026,3 +1040,16 @@ func (conn *Connection) NewPrepared(expr string) (*Prepared, error) { } return NewPreparedFromResponse(conn, resp) } + +// NewStream creates new Stream object for connection. +// +// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. +// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true. +// Since 1.7.0 +func (conn *Connection) NewStream() *Stream { + next := atomic.AddUint64(&conn.lastStreamId, 1) + return &Stream{ + Id: next, + Conn: conn, + } +} diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index ad2e936cc..147cdccb8 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -544,6 +544,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant return conn.Do(req) } +// NewStream creates new Stream object for connection selected +// by userMode from connPool. +// +// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. +// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true. +// Since 1.7.0 +func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + return conn.NewStream(), nil +} + // // private // diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 2e462e4eb..a2fb155c2 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -1369,6 +1369,294 @@ func TestDoWithStrangerConn(t *testing.T) { } } +func TestStream_Commit(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + roles := []bool{true, true, false, true, true} + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + stream, err := connPool.NewStream(connection_pool.PreferRW) + require.Nilf(t, err, "failed to create stream") + require.NotNilf(t, connPool, "stream is nil after NewStream") + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned") + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"commit_key", "commit_value"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned") + + // Connect to servers[2] to check if tuple + // was inserted outside of stream on RW instance + // before transaction commit + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"commit_key"}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "commit_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "commit_value", value, "unexpected body of Select (1)") + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Commit") + require.NotNilf(t, resp, "response is nil after Commit") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Commit: wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "commit_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "commit_value", value, "unexpected body of Select (1)") +} + +func TestStream_Rollback(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + roles := []bool{true, true, false, true, true} + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + stream, err := connPool.NewStream(connection_pool.PreferRW) + require.Nilf(t, err, "failed to create stream") + require.NotNilf(t, connPool, "stream is nil after NewStream") + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned") + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"rollback_key", "rollback_value"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned") + + // Connect to servers[2] to check if tuple + // was not inserted outside of stream on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"rollback_key"}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)") + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Rollback") + require.NotNilf(t, resp, "response is nil after Rollback") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Rollback: wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") +} + +func TestStream_TxnIsolationLevel(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + roles := []bool{true, true, false, true, true} + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + stream, err := connPool.NewStream(connection_pool.PreferRW) + require.Nilf(t, err, "failed to create stream") + require.NotNilf(t, connPool, "stream is nil after NewStream") + + // Begin transaction + req = tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadConfirmedLevel). + WithTimeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned") + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"rollback_key", "rollback_value"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned") + + // Connect to servers[2] to check if tuple + // was not inserted outside of stream on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"rollback_key"}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)") + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Rollback") + require.NotNilf(t, resp, "response is nil after Rollback") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Rollback: wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body @@ -1383,15 +1671,21 @@ func runTestMain(m *testing.M) int { "work_dir1", "work_dir2", "work_dir3", "work_dir4", "work_dir5"} - var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isStreamUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil { + log.Fatalf("Could not check the Tarantool version") + } instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{ - InitScript: initScript, - User: connOpts.User, - Pass: connOpts.Pass, - WaitStart: waitStart, - ConnectRetry: connectRetry, - RetryTimeout: retryTimeout, + InitScript: initScript, + User: connOpts.User, + Pass: connOpts.Pass, + WaitStart: waitStart, + ConnectRetry: connectRetry, + RetryTimeout: retryTimeout, + MemtxUseMvccEngine: !isStreamUnsupported, }) if err != nil { diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go index 08995d03e..4cd188b65 100644 --- a/connection_pool/example_test.go +++ b/connection_pool/example_test.go @@ -2,6 +2,7 @@ package connection_pool_test import ( "fmt" + "time" "github.com/tarantool/go-tarantool" "github.com/tarantool/go-tarantool/connection_pool" @@ -573,3 +574,263 @@ func ExampleConnectionPool_NewPrepared() { fmt.Printf("Failed to prepare") } } + +func ExampleCommitRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + return + } + defer pool.Close() + + // example pool has only one rw instance + stream, err := pool.NewStream(connection_pool.RW) + if err != nil { + fmt.Println(err) + return + } + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"example_commit_key", "example_commit_value"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"example_commit_key"}) + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Commit: %s", err.Error()) + return + } + fmt.Printf("Commit transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + // example pool has only one rw instance + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after commit: response is %#v\n", resp.Data) +} + +func ExampleRollbackRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + // example pool has only one rw instance + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + return + } + defer pool.Close() + + stream, err := pool.NewStream(connection_pool.RW) + if err != nil { + fmt.Println(err) + return + } + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"example_rollback_key", "example_rollback_value"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"example_rollback_key"}) + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + // example pool has only one rw instance + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} + +func ExampleBeginRequest_TxnIsolation() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + // example pool has only one rw instance + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + return + } + defer pool.Close() + + stream, err := pool.NewStream(connection_pool.RW) + if err != nil { + fmt.Println(err) + return + } + + // Begin transaction + req = tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadConfirmedLevel). + WithTimeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"isolation_level_key", "isolation_level_value"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"isolation_level_key"}) + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + // example pool has only one rw instance + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} diff --git a/const.go b/const.go index 3d0d7424f..4a3cb6833 100644 --- a/const.go +++ b/const.go @@ -13,11 +13,15 @@ const ( Call17RequestCode = 10 /* call in >= 1.7 format */ ExecuteRequestCode = 11 PrepareRequestCode = 13 + BeginRequestCode = 14 + CommitRequestCode = 15 + RollbackRequestCode = 16 PingRequestCode = 64 SubscribeRequestCode = 66 KeyCode = 0x00 KeySync = 0x01 + KeyStreamId = 0x0a KeySpaceNo = 0x10 KeyIndexNo = 0x11 KeyLimit = 0x12 @@ -37,6 +41,8 @@ const ( KeySQLBind = 0x41 KeySQLInfo = 0x42 KeyStmtID = 0x43 + KeyTimeout = 0x56 + KeyTxnIsolation = 0x59 KeyFieldName = 0x00 KeyFieldType = 0x01 diff --git a/example_test.go b/example_test.go index 65dc971a0..12841c86a 100644 --- a/example_test.go +++ b/example_test.go @@ -227,6 +227,236 @@ func ExampleUpsertRequest() { // response is []interface {}{[]interface {}{0x459, "first", "updated"}} } +func ExampleCommitRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + conn := example_connect() + defer conn.Close() + + stream := conn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "commit_hello", "commit_world"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Commit: %s", err.Error()) + return + } + fmt.Printf("Commit transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after commit: response is %#v\n", resp.Data) +} + +func ExampleRollbackRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + conn := example_connect() + defer conn.Close() + + stream := conn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(2001), "rollback_hello", "rollback_world"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(2001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} + +func ExampleBeginRequest_TxnIsolation() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + conn := example_connect() + defer conn.Close() + + stream := conn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadConfirmedLevel). + WithTimeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(2001), "rollback_hello", "rollback_world"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(2001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} + func ExampleFuture_GetIterator() { conn := example_connect() defer conn.Close() diff --git a/export_test.go b/export_test.go index 315f444de..6d5d5bf23 100644 --- a/export_test.go +++ b/export_test.go @@ -93,3 +93,21 @@ func RefImplExecutePreparedBody(enc *msgpack.Encoder, stmt Prepared, args interf func RefImplUnprepareBody(enc *msgpack.Encoder, stmt Prepared) error { return fillUnprepare(enc, stmt) } + +// RefImplBeginBody is reference implementation for filling of an begin +// request's body. +func RefImplBeginBody(enc *msgpack.Encoder, txnIsolation uint64, timeout time.Duration) error { + return fillBegin(enc, txnIsolation, timeout) +} + +// RefImplCommitBody is reference implementation for filling of an commit +// request's body. +func RefImplCommitBody(enc *msgpack.Encoder) error { + return fillCommit(enc) +} + +// RefImplRollbackBody is reference implementation for filling of an rollback +// request's body. +func RefImplRollbackBody(enc *msgpack.Encoder) error { + return fillRollback(enc) +} diff --git a/request_test.go b/request_test.go index 7c1805155..ba7deb0ce 100644 --- a/request_test.go +++ b/request_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "testing" + "time" "github.com/stretchr/testify/assert" @@ -22,6 +23,11 @@ const validExpr = "any string" // We don't check the value here. const defaultSpace = 0 // And valid too. const defaultIndex = 0 // And valid too. +const defaultIsolationLevel = 0 +const defaultTimeout = 0 + +const validTimeout = 500 * time.Millisecond + var validStmt *Prepared = &Prepared{StatementID: 1, Conn: &Connection{}} type ValidSchemeResolver struct { @@ -175,6 +181,9 @@ func TestRequestsCodes(t *testing.T) { {req: NewPrepareRequest(validExpr), code: PrepareRequestCode}, {req: NewUnprepareRequest(validStmt), code: PrepareRequestCode}, {req: NewExecutePreparedRequest(validStmt), code: ExecuteRequestCode}, + {req: NewBeginRequest(), code: BeginRequestCode}, + {req: NewCommitRequest(), code: CommitRequestCode}, + {req: NewRollbackRequest(), code: RollbackRequestCode}, } for _, test := range tests { @@ -585,3 +594,59 @@ func TestExecutePreparedRequestDefaultValues(t *testing.T) { assert.Equal(t, req.Conn(), validStmt.Conn) assertBodyEqual(t, refBuf.Bytes(), req) } + +func TestBeginRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplBeginBody(refEnc, defaultIsolationLevel, defaultTimeout) + if err != nil { + t.Errorf("An unexpected RefImplBeginBody() error: %q", err.Error()) + return + } + + req := NewBeginRequest() + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestBeginRequestSetters(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplBeginBody(refEnc, ReadConfirmedLevel, validTimeout) + if err != nil { + t.Errorf("An unexpected RefImplBeginBody() error: %q", err.Error()) + return + } + + req := NewBeginRequest().TxnIsolation(ReadConfirmedLevel).WithTimeout(validTimeout) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestCommitRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplCommitBody(refEnc) + if err != nil { + t.Errorf("An unexpected RefImplCommitBody() error: %q", err.Error()) + return + } + + req := NewCommitRequest() + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestRollbackRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplRollbackBody(refEnc) + if err != nil { + t.Errorf("An unexpected RefImplRollbackBody() error: %q", err.Error()) + return + } + + req := NewRollbackRequest() + assertBodyEqual(t, refBuf.Bytes(), req) +} diff --git a/stream.go b/stream.go new file mode 100644 index 000000000..95e90af15 --- /dev/null +++ b/stream.go @@ -0,0 +1,145 @@ +package tarantool + +import ( + "fmt" + "time" + + "gopkg.in/vmihailenco/msgpack.v2" +) + +const ( + // By default, the isolation level of Tarantool is serializable. + defaultIsolationLevel = iota + // The ReadCommittedLevel isolation level makes visible all transactions + // that started commit (stream.Do(NewCommitRequest()) was called). + ReadCommittedLevel + // The ReadConfirmedLevel isolation level makes visible all transactions + // that finished the commit (stream.Do(NewCommitRequest()) was returned). + ReadConfirmedLevel + // If the BestEffortLevel (serializable) isolation level becomes unreachable, + // the transaction is marked as «conflicted» and can no longer be committed. + BestEffortLevel +) + +type Stream struct { + Id uint64 + Conn *Connection +} + +func fillBegin(enc *msgpack.Encoder, txnIsolation uint64, timeout time.Duration) error { + hasTimeout := timeout > 0 + hasIsolationLevel := txnIsolation != defaultIsolationLevel + mapLen := 0 + if hasTimeout { + mapLen += 1 + } + if hasIsolationLevel { + mapLen += 1 + } + + err := enc.EncodeMapLen(mapLen) + + if hasTimeout { + enc.EncodeUint64(KeyTimeout) + err = enc.Encode(timeout.Seconds()) + } + + if hasIsolationLevel { + enc.EncodeUint64(KeyTxnIsolation) + err = enc.Encode(txnIsolation) + } + + return err +} + +func fillCommit(enc *msgpack.Encoder) error { + return enc.EncodeMapLen(0) +} + +func fillRollback(enc *msgpack.Encoder) error { + return enc.EncodeMapLen(0) +} + +// BeginRequest helps you to create an execute request object for execution +// by a Connection. +type BeginRequest struct { + baseRequest + txnIsolation uint64 + timeout time.Duration +} + +// NewBeginRequest returns a new BeginRequest. +func NewBeginRequest() *BeginRequest { + req := new(BeginRequest) + req.requestCode = BeginRequestCode + return req +} + +// TxnIsolation sets the the transaction isolation level for transaction manager. +// By default, the isolation level of Tarantool is serializable. +func (req *BeginRequest) TxnIsolation(txnIsolation uint64) *BeginRequest { + req.txnIsolation = txnIsolation + return req +} + +// WithTimeout allows to set up a timeout for call BeginRequest. +func (req *BeginRequest) WithTimeout(timeout time.Duration) *BeginRequest { + req.timeout = timeout + return req +} + +// Body fills an encoder with the begin request body. +func (req *BeginRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillBegin(enc, req.txnIsolation, req.timeout) +} + +// CommitRequest helps you to create an execute request object for execution +// by a Connection. +type CommitRequest struct { + baseRequest +} + +// NewCommitRequest returns a new CommitRequest. +func NewCommitRequest() *CommitRequest { + req := new(CommitRequest) + req.requestCode = CommitRequestCode + return req +} + +// Body fills an encoder with the commit request body. +func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillCommit(enc) +} + +// RollbackRequest helps you to create an execute request object for execution +// by a Connection. +type RollbackRequest struct { + baseRequest +} + +// NewRollbackRequest returns a new RollbackRequest. +func NewRollbackRequest() *RollbackRequest { + req := new(RollbackRequest) + req.requestCode = RollbackRequestCode + return req +} + +// Body fills an encoder with the rollback request body. +func (req *RollbackRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillRollback(enc) +} + +// Do verifies, sends the request and returns a future. +// +// An error is returned if the request was formed incorrectly, or failure to +// create the future. +func (s *Stream) Do(req Request) *Future { + if connectedReq, ok := req.(ConnectedRequest); ok { + if connectedReq.Conn() != s.Conn { + fut := NewFuture() + fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool")) + return fut + } + } + return s.Conn.send(req, s.Id) +} diff --git a/tarantool_test.go b/tarantool_test.go index 06771338c..dca628909 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -538,6 +538,19 @@ func BenchmarkSQLSerial(b *testing.B) { /////////////////// +func deleteByKey(conn *Connection, space interface{}, index interface{}, key []interface{}) { + req := NewDeleteRequest(space). + Index(index). + Key(key) + resp, err := conn.Do(req).Get() + if err != nil { + panic("Failed to Delete: " + err.Error()) + } + if resp == nil { + panic("Response is nil after Select") + } +} + func TestClient(t *testing.T) { var resp *Response var err error @@ -2110,21 +2123,371 @@ func TestComplexStructs(t *testing.T) { } } +func TestStream_Commit(t *testing.T) { + var req Request + var resp *Response + var err error + var conn *Connection + + test_helpers.SkipIfStreamsUnsupported(t) + + conn = test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + stream := conn.NewStream() + + // Begin transaction + req = NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer deleteByKey(conn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Commit transaction + req = NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Commit: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Commit: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } +} + +func TestStream_Rollback(t *testing.T) { + var req Request + var resp *Response + var err error + var conn *Connection + + test_helpers.SkipIfStreamsUnsupported(t) + + conn = test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + stream := conn.NewStream() + + // Begin transaction + req = NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer deleteByKey(conn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Rollback transaction + req = NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Rollback: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Rollback: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } +} + +func TestStream_TxnIsolationLevel(t *testing.T) { + var req Request + var resp *Response + var err error + var conn *Connection + + test_helpers.SkipIfStreamsUnsupported(t) + + conn = test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + stream := conn.NewStream() + + // Begin transaction + req = NewBeginRequest().TxnIsolation(ReadConfirmedLevel).WithTimeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer deleteByKey(conn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Rollback transaction + req = NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Rollback: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Rollback: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body // is a separate function, see // https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls func runTestMain(m *testing.M) int { + // Tarantool supports streams and interactive transactions since version 2.10.0 + isStreamUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil { + log.Fatalf("Could not check the Tarantool version") + } + inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{ - InitScript: "config.lua", - Listen: server, - WorkDir: "work_dir", - User: opts.User, - Pass: opts.Pass, - WaitStart: 100 * time.Millisecond, - ConnectRetry: 3, - RetryTimeout: 500 * time.Millisecond, + InitScript: "config.lua", + Listen: server, + WorkDir: "work_dir", + User: opts.User, + Pass: opts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, + MemtxUseMvccEngine: !isStreamUnsupported, }) defer test_helpers.StopTarantoolWithCleanup(inst) diff --git a/test_helpers/main.go b/test_helpers/main.go index 5c9d5135e..03cf42cba 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -72,6 +72,10 @@ type StartOpts struct { // RetryTimeout is a time between tarantool ping retries. RetryTimeout time.Duration + + // MemtxUseMvccEngine is flag to enable transactional + // manager if set to true. + MemtxUseMvccEngine bool } // TarantoolInstance is a data for instance graceful shutdown and cleanup. @@ -190,6 +194,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) { os.Environ(), fmt.Sprintf("TEST_TNT_WORK_DIR=%s", startOpts.WorkDir), fmt.Sprintf("TEST_TNT_LISTEN=%s", startOpts.Listen), + fmt.Sprintf("TT_MEMTX_USE_MVCC_ENGINE=%t", startOpts.MemtxUseMvccEngine), ) // Clean up existing work_dir. diff --git a/test_helpers/utils.go b/test_helpers/utils.go index e07f34bf8..38927e3cc 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -36,3 +36,17 @@ func SkipIfSQLUnsupported(t testing.TB) { t.Skip() } } + +func SkipIfStreamsUnsupported(t *testing.T) { + t.Helper() + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, err := IsTarantoolVersionLess(2, 10, 0) + if err != nil { + t.Fatalf("Could not check the Tarantool version") + } + + if isLess { + t.Skip("Skipping test for Tarantool without streams support") + } +}