From 2337105299b6611cc8ada83b4d90af050072ab41 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 3 Apr 2017 10:57:32 -0700 Subject: [PATCH 01/20] unclean fix works --- .../java/com/microsoft/sqlserver/jdbc/IOBuffer.java | 7 +++++++ .../java/com/microsoft/sqlserver/jdbc/Parameter.java | 10 ---------- .../microsoft/sqlserver/jdbc/SQLServerResource.java | 1 - 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 507e65cfa..766e5fbce 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4536,6 +4536,9 @@ void writeTVPRows(TVP value) throws SQLServerException { boolean isShortValue, isNull; int dataLength; + ByteBuffer tempStagingBuffer = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); + tempStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + if (!value.isNull()) { Map columnMetadata = value.getColumnMetadata(); Iterator> columnsIterator; @@ -4749,8 +4752,12 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) } currentColumn++; } + tempStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); } } + stagingBuffer.clear(); + stagingBuffer.put(tempStagingBuffer.array(), 0, tempStagingBuffer.position()); + // TVP_END_TOKEN writeByte((byte) 0x00); } diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/Parameter.java b/src/main/java/com/microsoft/sqlserver/jdbc/Parameter.java index 6c12683c4..9595cab70 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/Parameter.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/Parameter.java @@ -331,16 +331,6 @@ else if (value instanceof SQLServerDataTable) { tvpValue = new TVP(tvpName, (SQLServerDataTable) value); } else if (value instanceof ResultSet) { - // if ResultSet and PreparedStatemet/CallableStatement are created from same connection object - // with property SelectMethod=cursor, TVP is not supported - if (con.getSelectMethod().equalsIgnoreCase("cursor") && (value instanceof SQLServerResultSet)) { - SQLServerStatement stmt = (SQLServerStatement) ((SQLServerResultSet) value).getStatement(); - - if (con.equals(stmt.connection)) { - throw new SQLServerException(SQLServerException.getErrString("R_invalidServerCursorForTVP"), null); - } - } - tvpValue = new TVP(tvpName, (ResultSet) value); } else if (value instanceof ISQLServerDataRecord) { diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResource.java b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResource.java index 53d9ddf6c..3503b6015 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResource.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResource.java @@ -369,7 +369,6 @@ protected Object[][] getContents() { {"R_invalidKeyStoreFile", "Cannot parse \"{0}\". Either the file format is not valid or the password is not correct."}, // for JKS/PKCS {"R_invalidCEKCacheTtl", "Invalid column encryption key cache time-to-live specified. The columnEncryptionKeyCacheTtl value cannot be negative and timeUnit can only be DAYS, HOURS, MINUTES or SECONDS."}, {"R_sendTimeAsDateTimeForAE", "Use sendTimeAsDateTime=false with Always Encrypted."}, - {"R_invalidServerCursorForTVP" , "Use different Connection for source ResultSet and prepared query, if selectMethod is set to cursor for Table-Valued Parameter."}, {"R_TVPnotWorkWithSetObjectResultSet" , "setObject() with ResultSet is not supported for Table-Valued Parameter. Please use setStructured()"}, {"R_invalidQueryTimeout", "The queryTimeout {0} is not valid."}, {"R_invalidSocketTimeout", "The socketTimeout {0} is not valid."}, From dc99efd2958f5af154b7cd8b9caabe551735382d Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 3 Apr 2017 14:00:26 -0700 Subject: [PATCH 02/20] clean the logic --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 35 ++++++++++++++++--- .../sqlserver/jdbc/SQLServerResultSet.java | 4 +++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 766e5fbce..3e9f39431 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4535,11 +4535,29 @@ void writeTVP(TVP value) throws SQLServerException { void writeTVPRows(TVP value) throws SQLServerException { boolean isShortValue, isNull; int dataLength; + + boolean tdsWritterCached = false; - ByteBuffer tempStagingBuffer = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); - tempStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + ByteBuffer cachedStagingBuffer = null; if (!value.isNull()) { + + // If TVP is set with ResultSet and Server Cursor is used, the tdsWriter of the calling preparedStatement is overwritten + // by the SQLServerResultSet#next() method if the preparedStatement and the ResultSet are created by the same connection. + // Therefore, we need to cache the tdsWriter's value and update it with new TDS values. + if (TVPType.ResultSet == value.tvpType) { + if ((null != value.sourceResultSet) && (value.sourceResultSet instanceof SQLServerResultSet)) { + SQLServerStatement src_stmt = (SQLServerStatement) ((SQLServerResultSet) value.sourceResultSet).getStatement(); + int resultSetServerCursorId = ((SQLServerResultSet) value.sourceResultSet).getServerCursorId(); + + if (con.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) { + cachedStagingBuffer = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); + cachedStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + tdsWritterCached = true; + } + } + } + Map columnMetadata = value.getColumnMetadata(); Iterator> columnsIterator; @@ -4752,11 +4770,18 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) } currentColumn++; } - tempStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + + if (tdsWritterCached) { + cachedStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + stagingBuffer.clear(); + } } } - stagingBuffer.clear(); - stagingBuffer.put(tempStagingBuffer.array(), 0, tempStagingBuffer.position()); + + if (tdsWritterCached) { + stagingBuffer.clear(); + stagingBuffer.put(cachedStagingBuffer.array(), 0, cachedStagingBuffer.position()); + } // TVP_END_TOKEN writeByte((byte) 0x00); diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java index 33446ed72..cde46e195 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java @@ -83,6 +83,10 @@ String getClassNameLogging() { private boolean isClosed = false; private final int serverCursorId; + + int getServerCursorId() { + return serverCursorId; + } /** the intended fetch direction to optimize cursor performance */ private int fetchDirection; From b0f1bbc1e7e1233c028934b53136521da51fa381 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 3 Apr 2017 18:49:55 -0700 Subject: [PATCH 03/20] detect if needs to read TVP response --- .../java/com/microsoft/sqlserver/jdbc/IOBuffer.java | 13 +++++++++++-- .../sqlserver/jdbc/SQLServerConnection.java | 2 ++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 3e9f39431..bd958c402 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4781,6 +4781,8 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) if (tdsWritterCached) { stagingBuffer.clear(); stagingBuffer.put(cachedStagingBuffer.array(), 0, cachedStagingBuffer.position()); + + con.needsToReadTVPResponse = true; } // TVP_END_TOKEN @@ -6237,8 +6239,15 @@ private boolean nextPacket() throws SQLServerException { * that is trying to buffer it with TDSCommand.detach(). */ synchronized final boolean readPacket() throws SQLServerException { - if (null != command && !command.readingResponse()) - return false; + + if (null != command && !command.readingResponse()){ + + if(!con.needsToReadTVPResponse){ + return false; + } + + con.needsToReadTVPResponse = false; + } // Number of packets in should always be less than number of packets out. // If the server has been notified for an interrupt, it may be less by diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java index e3e4a6ba7..627167294 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java @@ -81,6 +81,8 @@ public class SQLServerConnection implements ISQLServerConnection { long timerExpire; boolean attemptRefreshTokenLocked = false; + + boolean needsToReadTVPResponse = false; private boolean fedAuthRequiredByUser = false; private boolean fedAuthRequiredPreLoginResponse = false; From f4725994ed1a5b8debc28c787c39111ceacf2178 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 4 Apr 2017 13:17:56 -0700 Subject: [PATCH 04/20] cache TDS command before overwriting it --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 24 +++++++++---------- .../sqlserver/jdbc/SQLServerConnection.java | 2 -- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index bd958c402..18f5b1218 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4539,12 +4539,14 @@ void writeTVPRows(TVP value) throws SQLServerException { boolean tdsWritterCached = false; ByteBuffer cachedStagingBuffer = null; + TDSCommand cachedCommand = null; if (!value.isNull()) { // If TVP is set with ResultSet and Server Cursor is used, the tdsWriter of the calling preparedStatement is overwritten // by the SQLServerResultSet#next() method if the preparedStatement and the ResultSet are created by the same connection. - // Therefore, we need to cache the tdsWriter's value and update it with new TDS values. + // Therefore, we need to cache the tdsWriter's values (stagingBuffer for sending data and command for retrieving data) and update + // stagingBuffer with new TDS values. if (TVPType.ResultSet == value.tvpType) { if ((null != value.sourceResultSet) && (value.sourceResultSet instanceof SQLServerResultSet)) { SQLServerStatement src_stmt = (SQLServerStatement) ((SQLServerResultSet) value.sourceResultSet).getStatement(); @@ -4553,6 +4555,9 @@ void writeTVPRows(TVP value) throws SQLServerException { if (con.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) { cachedStagingBuffer = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); cachedStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + + cachedCommand = this.command; + tdsWritterCached = true; } } @@ -4770,19 +4775,18 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) } currentColumn++; } - + if (tdsWritterCached) { cachedStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); stagingBuffer.clear(); } } } - + if (tdsWritterCached) { stagingBuffer.clear(); stagingBuffer.put(cachedStagingBuffer.array(), 0, cachedStagingBuffer.position()); - - con.needsToReadTVPResponse = true; + this.command = cachedCommand; } // TVP_END_TOKEN @@ -6239,14 +6243,8 @@ private boolean nextPacket() throws SQLServerException { * that is trying to buffer it with TDSCommand.detach(). */ synchronized final boolean readPacket() throws SQLServerException { - - if (null != command && !command.readingResponse()){ - - if(!con.needsToReadTVPResponse){ - return false; - } - - con.needsToReadTVPResponse = false; + if (null != command && !command.readingResponse()) { + return false; } // Number of packets in should always be less than number of packets out. diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java index 627167294..e3e4a6ba7 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java @@ -81,8 +81,6 @@ public class SQLServerConnection implements ISQLServerConnection { long timerExpire; boolean attemptRefreshTokenLocked = false; - - boolean needsToReadTVPResponse = false; private boolean fedAuthRequiredByUser = false; private boolean fedAuthRequiredPreLoginResponse = false; From c942b5c8f87e8facf35189ee7c84775f4676830a Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 4 Apr 2017 13:25:21 -0700 Subject: [PATCH 05/20] clean code --- src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 18f5b1218..014011691 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4537,7 +4537,6 @@ void writeTVPRows(TVP value) throws SQLServerException { int dataLength; boolean tdsWritterCached = false; - ByteBuffer cachedStagingBuffer = null; TDSCommand cachedCommand = null; @@ -6243,9 +6242,8 @@ private boolean nextPacket() throws SQLServerException { * that is trying to buffer it with TDSCommand.detach(). */ synchronized final boolean readPacket() throws SQLServerException { - if (null != command && !command.readingResponse()) { + if (null != command && !command.readingResponse()) return false; - } // Number of packets in should always be less than number of packets out. // If the server has been notified for an interrupt, it may be less by From 0503347cb6ed43557c22f9595bc38a60a43fef0a Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 4 Apr 2017 18:10:46 -0700 Subject: [PATCH 06/20] added tests --- .../jdbc/tvp/TVPResultSetCursorTest.java | 177 ++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java new file mode 100644 index 000000000..405c08a5f --- /dev/null +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -0,0 +1,177 @@ +/* + * Microsoft JDBC Driver for SQL Server + * + * Copyright(c) Microsoft Corporation All rights reserved. + * + * This program is made available under the terms of the MIT License. See the LICENSE file in the project root for more information. + */ +package com.microsoft.sqlserver.jdbc.tvp; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.Properties; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement; +import com.microsoft.sqlserver.testframework.AbstractTest; + +@RunWith(JUnitPlatform.class) +public class TVPResultSetCursorTest extends AbstractTest { + + private static Connection conn = null; + static Statement stmt = null; + + static BigDecimal[] expectedBigDecimals = {new BigDecimal("12345.12345"), new BigDecimal("125.123"), new BigDecimal("45.12345")}; + static String[] expectedBigDecimalStrings = {"12345.12345", "125.12300", "45.12345"}; + + static String[] expectedStrings = {"hello", "world", "!!!"}; + + static Timestamp[] expectedTimestamps = {new Timestamp(1433338533461L), new Timestamp(14917485583999L), new Timestamp(1491123533000L)}; + static String[] expectedTimestampStrings = {"2015-06-03 06:35:33.4610000", "2442-09-18 18:59:43.9990000", "2017-04-02 01:58:53.0000000"}; + + private static String tvpName = "TVPResultSetCursorTest_TVP"; + private static String srcTable = "TVPResultSetCursorTest_SourceTable"; + private static String desTable = "TVPResultSetCursorTest_DestinationTable"; + + /** + * Test a previous failure when using server cursor and using the same connection to create TVP and result set. + * + * @throws SQLException + */ + @Test + public void testServerCursors() throws SQLException { + conn = DriverManager.getConnection(connectionString); + stmt = conn.createStatement(); + + dropTVPS(); + dropTables(); + + createTVPS(); + createTables(); + + populateSourceTable(); + + ResultSet rs = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE).executeQuery("select * from " + srcTable); + + SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt.setStructured(1, tvpName, rs); + pstmt.execute(); + + verifyDestinationTableData(); + + if (null != pstmt) { + pstmt.close(); + } + if (null != rs) { + rs.close(); + } + } + + /** + * Test a previous failure when setting SelectMethod to cursor and using the same connection to create TVP and result set. + * + * @throws SQLException + */ + @Test + public void testSelectMethodSetToCursor() throws SQLException { + Properties info = new Properties(); + info.setProperty("SelectMethod", "cursor"); + conn = DriverManager.getConnection(connectionString, info); + + stmt = conn.createStatement(); + + dropTVPS(); + dropTables(); + + createTVPS(); + createTables(); + + populateSourceTable(); + + ResultSet rs = conn.createStatement().executeQuery("select * from " + srcTable); + + SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt.setStructured(1, tvpName, rs); + pstmt.execute(); + + verifyDestinationTableData(); + + if (null != pstmt) { + pstmt.close(); + } + if (null != rs) { + rs.close(); + } + } + + private static void verifyDestinationTableData() throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("select * from " + desTable); + + int i = 0; + while (rs.next()) { + assertTrue(rs.getString(1).equals(expectedBigDecimalStrings[i])); + assertTrue(rs.getString(2).trim().equals(expectedStrings[i])); + assertTrue(rs.getString(3).equals(expectedTimestampStrings[i])); + i++; + } + + assertTrue(i == expectedBigDecimals.length); + } + + private static void populateSourceTable() throws SQLException { + String sql = "insert into " + srcTable + " values (?,?,?)"; + + SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement(sql); + + for (int i = 0; i < expectedBigDecimals.length; i++) { + pstmt.setBigDecimal(1, expectedBigDecimals[i]); + pstmt.setString(2, expectedStrings[i]); + pstmt.setTimestamp(3, expectedTimestamps[i]); + pstmt.execute(); + } + } + + private static void dropTables() throws SQLException { + stmt.executeUpdate("if object_id('" + srcTable + "','U') is not null" + " drop table " + srcTable); + stmt.executeUpdate("if object_id('" + desTable + "','U') is not null" + " drop table " + desTable); + } + + private static void createTables() throws SQLException { + String sql = "create table " + srcTable + " (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null);"; + stmt.execute(sql); + + sql = "create table " + desTable + " (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null);"; + stmt.execute(sql); + } + + private static void createTVPS() throws SQLException { + String TVPCreateCmd = "CREATE TYPE " + tvpName + " as table (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null)"; + stmt.executeUpdate(TVPCreateCmd); + } + + private static void dropTVPS() throws SQLException { + stmt.executeUpdate("IF EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = '" + tvpName + "') " + " drop type " + tvpName); + } + + @AfterEach + private void terminateVariation() throws SQLException { + if (null != conn) { + conn.close(); + } + if (null != stmt) { + stmt.close(); + } + } + +} \ No newline at end of file From 798021ae601d727d82824aceb358305006e4b1f7 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Wed, 5 Apr 2017 09:00:02 -0700 Subject: [PATCH 07/20] print out expected value and actual value --- .../sqlserver/jdbc/tvp/TVPResultSetCursorTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index 405c08a5f..169b7fad7 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -120,9 +120,12 @@ private static void verifyDestinationTableData() throws SQLException { int i = 0; while (rs.next()) { - assertTrue(rs.getString(1).equals(expectedBigDecimalStrings[i])); - assertTrue(rs.getString(2).trim().equals(expectedStrings[i])); - assertTrue(rs.getString(3).equals(expectedTimestampStrings[i])); + assertTrue(rs.getString(1).equals(expectedBigDecimalStrings[i]), + "Expected Value:" + expectedBigDecimalStrings[i] + ", Actual Value: " + rs.getString(1)); + assertTrue(rs.getString(2).trim().equals(expectedStrings[i]), + "Expected Value:" + expectedStrings[i] + ", Actual Value: " + rs.getString(2)); + assertTrue(rs.getString(3).equals(expectedTimestampStrings[i]), + "Expected Value:" + expectedTimestampStrings[i] + ", Actual Value: " + rs.getString(3)); i++; } From 207cf4bf11d405e62b90b046d82cbc1e65d7914b Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Wed, 5 Apr 2017 09:21:33 -0700 Subject: [PATCH 08/20] use calendar to specify time zone --- .../sqlserver/jdbc/tvp/TVPResultSetCursorTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index 169b7fad7..401938ffd 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -16,7 +16,9 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.util.Calendar; import java.util.Properties; +import java.util.TimeZone; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -38,7 +40,7 @@ public class TVPResultSetCursorTest extends AbstractTest { static String[] expectedStrings = {"hello", "world", "!!!"}; static Timestamp[] expectedTimestamps = {new Timestamp(1433338533461L), new Timestamp(14917485583999L), new Timestamp(1491123533000L)}; - static String[] expectedTimestampStrings = {"2015-06-03 06:35:33.4610000", "2442-09-18 18:59:43.9990000", "2017-04-02 01:58:53.0000000"}; + static String[] expectedTimestampStrings = {"2015-06-03 13:35:33.4610000", "2442-09-19 01:59:43.9990000", "2017-04-02 08:58:53.0000000"}; private static String tvpName = "TVPResultSetCursorTest_TVP"; private static String srcTable = "TVPResultSetCursorTest_SourceTable"; @@ -135,12 +137,14 @@ private static void verifyDestinationTableData() throws SQLException { private static void populateSourceTable() throws SQLException { String sql = "insert into " + srcTable + " values (?,?,?)"; + Calendar calGMT = Calendar.getInstance(TimeZone.getTimeZone("GMT")); + SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement(sql); for (int i = 0; i < expectedBigDecimals.length; i++) { pstmt.setBigDecimal(1, expectedBigDecimals[i]); pstmt.setString(2, expectedStrings[i]); - pstmt.setTimestamp(3, expectedTimestamps[i]); + pstmt.setTimestamp(3, expectedTimestamps[i], calGMT); pstmt.execute(); } } From f998e19d13d246b046377bd09ef1d69b936031d1 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Wed, 5 Apr 2017 10:35:22 -0700 Subject: [PATCH 09/20] use Utils.dropTableIfExists to drop source & dest tables --- .../microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index 401938ffd..fcee06467 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -27,6 +27,7 @@ import com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement; import com.microsoft.sqlserver.testframework.AbstractTest; +import com.microsoft.sqlserver.testframework.Utils; @RunWith(JUnitPlatform.class) public class TVPResultSetCursorTest extends AbstractTest { @@ -150,8 +151,8 @@ private static void populateSourceTable() throws SQLException { } private static void dropTables() throws SQLException { - stmt.executeUpdate("if object_id('" + srcTable + "','U') is not null" + " drop table " + srcTable); - stmt.executeUpdate("if object_id('" + desTable + "','U') is not null" + " drop table " + desTable); + Utils.dropTableIfExists(srcTable, stmt); + Utils.dropTableIfExists(desTable, stmt); } private static void createTables() throws SQLException { From 253b852015c4e22d33e7da50772b52a91c54cdaf Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Wed, 5 Apr 2017 11:25:06 -0700 Subject: [PATCH 10/20] add test with multiple prepared statements and result sets --- .../jdbc/tvp/TVPResultSetCursorTest.java | 92 +++++++++++++++++-- 1 file changed, 82 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index fcee06467..7d935dc1f 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -71,7 +71,7 @@ public void testServerCursors() throws SQLException { pstmt.setStructured(1, tvpName, rs); pstmt.execute(); - verifyDestinationTableData(); + verifyDestinationTableData(expectedBigDecimals.length); if (null != pstmt) { pstmt.close(); @@ -108,7 +108,7 @@ public void testSelectMethodSetToCursor() throws SQLException { pstmt.setStructured(1, tvpName, rs); pstmt.execute(); - verifyDestinationTableData(); + verifyDestinationTableData(expectedBigDecimals.length); if (null != pstmt) { pstmt.close(); @@ -118,21 +118,93 @@ public void testSelectMethodSetToCursor() throws SQLException { } } - private static void verifyDestinationTableData() throws SQLException { + /** + * test with multiple prepared statements and result sets + * + * @throws SQLException + */ + @Test + public void testMultiplePreparedStatementAndResultSet() throws SQLException { + conn = DriverManager.getConnection(connectionString); + + stmt = conn.createStatement(); + + dropTVPS(); + dropTables(); + + createTVPS(); + createTables(); + + populateSourceTable(); + + ResultSet rs = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE).executeQuery("select * from " + srcTable); + + SQLServerPreparedStatement pstmt1 = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt1.setStructured(1, tvpName, rs); + pstmt1.execute(); + verifyDestinationTableData(expectedBigDecimals.length); + + rs.beforeFirst(); + pstmt1 = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt1.setStructured(1, tvpName, rs); + pstmt1.execute(); + verifyDestinationTableData(expectedBigDecimals.length * 2); + + rs.beforeFirst(); + SQLServerPreparedStatement pstmt2 = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt2.setStructured(1, tvpName, rs); + pstmt2.execute(); + verifyDestinationTableData(expectedBigDecimals.length * 3); + + String sql = "insert into " + desTable + " values (?,?,?)"; + Calendar calGMT = Calendar.getInstance(TimeZone.getTimeZone("GMT")); + pstmt1 = (SQLServerPreparedStatement) conn.prepareStatement(sql); + for (int i = 0; i < expectedBigDecimals.length; i++) { + pstmt1.setBigDecimal(1, expectedBigDecimals[i]); + pstmt1.setString(2, expectedStrings[i]); + pstmt1.setTimestamp(3, expectedTimestamps[i], calGMT); + pstmt1.execute(); + } + verifyDestinationTableData(expectedBigDecimals.length * 4); + + ResultSet rs2 = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE).executeQuery("select * from " + srcTable); + + pstmt1 = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt1.setStructured(1, tvpName, rs2); + pstmt1.execute(); + verifyDestinationTableData(expectedBigDecimals.length * 5); + + if (null != pstmt1) { + pstmt1.close(); + } + if (null != pstmt2) { + pstmt2.close(); + } + if (null != rs) { + rs.close(); + } + if (null != rs2) { + rs2.close(); + } + } + + private static void verifyDestinationTableData(int expectedNumberOfRows) throws SQLException { ResultSet rs = conn.createStatement().executeQuery("select * from " + desTable); + int expectedArrayLength = expectedBigDecimals.length; + int i = 0; while (rs.next()) { - assertTrue(rs.getString(1).equals(expectedBigDecimalStrings[i]), - "Expected Value:" + expectedBigDecimalStrings[i] + ", Actual Value: " + rs.getString(1)); - assertTrue(rs.getString(2).trim().equals(expectedStrings[i]), - "Expected Value:" + expectedStrings[i] + ", Actual Value: " + rs.getString(2)); - assertTrue(rs.getString(3).equals(expectedTimestampStrings[i]), - "Expected Value:" + expectedTimestampStrings[i] + ", Actual Value: " + rs.getString(3)); + assertTrue(rs.getString(1).equals(expectedBigDecimalStrings[i % expectedArrayLength]), + "Expected Value:" + expectedBigDecimalStrings[i % expectedArrayLength] + ", Actual Value: " + rs.getString(1)); + assertTrue(rs.getString(2).trim().equals(expectedStrings[i % expectedArrayLength]), + "Expected Value:" + expectedStrings[i % expectedArrayLength] + ", Actual Value: " + rs.getString(2)); + assertTrue(rs.getString(3).equals(expectedTimestampStrings[i % expectedArrayLength]), + "Expected Value:" + expectedTimestampStrings[i % expectedArrayLength] + ", Actual Value: " + rs.getString(3)); i++; } - assertTrue(i == expectedBigDecimals.length); + assertTrue(i == expectedNumberOfRows); } private static void populateSourceTable() throws SQLException { From 7b2a280d4679fae9de8e1a8160b6070df39e980e Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 10 Apr 2017 12:42:06 -0700 Subject: [PATCH 11/20] send TVP row by row to bypass MARS --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 014011691..c72962dac 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4537,23 +4537,22 @@ void writeTVPRows(TVP value) throws SQLServerException { int dataLength; boolean tdsWritterCached = false; - ByteBuffer cachedStagingBuffer = null; + ByteBuffer cachedTVPHeaders = null; TDSCommand cachedCommand = null; if (!value.isNull()) { - // If TVP is set with ResultSet and Server Cursor is used, the tdsWriter of the calling preparedStatement is overwritten - // by the SQLServerResultSet#next() method if the preparedStatement and the ResultSet are created by the same connection. - // Therefore, we need to cache the tdsWriter's values (stagingBuffer for sending data and command for retrieving data) and update - // stagingBuffer with new TDS values. + // If the preparedStatement and the ResultSet are created by the same connection, and TVP is set with ResultSet and Server Cursor + // is used, the tdsWriter of the calling preparedStatement is overwritten by the SQLServerResultSet#next() method when fetching new rows. + // Therefore, we need to send TVP data row by row before fetching new row. if (TVPType.ResultSet == value.tvpType) { if ((null != value.sourceResultSet) && (value.sourceResultSet instanceof SQLServerResultSet)) { SQLServerStatement src_stmt = (SQLServerStatement) ((SQLServerResultSet) value.sourceResultSet).getStatement(); int resultSetServerCursorId = ((SQLServerResultSet) value.sourceResultSet).getServerCursorId(); if (con.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) { - cachedStagingBuffer = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); - cachedStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); + cachedTVPHeaders = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); + cachedTVPHeaders.put(stagingBuffer.array(), 0, stagingBuffer.position()); cachedCommand = this.command; @@ -4566,6 +4565,13 @@ void writeTVPRows(TVP value) throws SQLServerException { Iterator> columnsIterator; while (value.next()) { + + // restore TDS header that has been overwritten + if (tdsWritterCached) { + stagingBuffer.clear(); + writeBytes(cachedTVPHeaders.array(), 0, cachedTVPHeaders.position()); + } + Object[] rowData = value.getRowData(); // ROW @@ -4775,21 +4781,25 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) currentColumn++; } + // send this row and read its response if (tdsWritterCached) { - cachedStagingBuffer.put(stagingBuffer.array(), 0, stagingBuffer.position()); - stagingBuffer.clear(); + // TVP_END_TOKEN + writeByte((byte) 0x00); + + writePacket(TDS.STATUS_BIT_EOM); + + command = cachedCommand; + command.setReadingResponse(true); + while (tdsChannel.getReader(command).readPacket()) + ; } } } - if (tdsWritterCached) { - stagingBuffer.clear(); - stagingBuffer.put(cachedStagingBuffer.array(), 0, cachedStagingBuffer.position()); - this.command = cachedCommand; + if (!tdsWritterCached) { + // TVP_END_TOKEN + writeByte((byte) 0x00); } - - // TVP_END_TOKEN - writeByte((byte) 0x00); } private static byte[] toByteArray(String s) { @@ -7027,6 +7037,10 @@ boolean attentionPending() { // or by detaching. private volatile boolean readingResponse; + void setReadingResponse(boolean readingResponse) { + this.readingResponse = readingResponse; + } + final boolean readingResponse() { return readingResponse; } From f432926293f525757ad79c5ab347fb03c7c8ddcd Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 10 Apr 2017 13:27:19 -0700 Subject: [PATCH 12/20] fix assertion errors --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index c72962dac..71fc7aec7 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4566,8 +4566,10 @@ void writeTVPRows(TVP value) throws SQLServerException { while (value.next()) { - // restore TDS header that has been overwritten + // restore command and TDS header, which have been overwritten by value.next() if (tdsWritterCached) { + command = cachedCommand; + stagingBuffer.clear(); writeBytes(cachedTVPHeaders.array(), 0, cachedTVPHeaders.position()); } @@ -4781,22 +4783,29 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) currentColumn++; } - // send this row and read its response + // send this row, read its response and reset command status if (tdsWritterCached) { // TVP_END_TOKEN writeByte((byte) 0x00); writePacket(TDS.STATUS_BIT_EOM); - command = cachedCommand; - command.setReadingResponse(true); while (tdsChannel.getReader(command).readPacket()) ; + + command.setInterruptsEnabled(true); + command.setRequestComplete(false); } } } - if (!tdsWritterCached) { + // reset command status which have been overwritten + if (tdsWritterCached) { + command.setRequestComplete(false); + command.setInterruptsEnabled(true); + command.setProcessedResponse(false); + } + else { // TVP_END_TOKEN writeByte((byte) 0x00); } @@ -7000,6 +7009,10 @@ final void log(Level level, // If the command is interrupted after interrupts have been disabled, then the // interrupt is ignored. private volatile boolean interruptsEnabled = false; + + void setInterruptsEnabled(boolean interruptsEnabled) { + this.interruptsEnabled = interruptsEnabled; + } // Flag set to indicate that an interrupt has happened. private volatile boolean wasInterrupted = false; @@ -7016,6 +7029,10 @@ private boolean wasInterrupted() { // thread's responsibility to send the attention signal to the server if necessary. // After the request is complete, the interrupting thread must send the attention signal. private volatile boolean requestComplete; + + void setRequestComplete(boolean requestComplete) { + this.requestComplete = requestComplete; + } // Flag set when an attention signal has been sent to the server, indicating that a // TDS packet containing the attention ack message is to be expected in the response. @@ -7030,6 +7047,10 @@ boolean attentionPending() { // there may be unprocessed information left in the response, such as transaction // ENVCHANGE notifications. private volatile boolean processedResponse; + + void setProcessedResponse(boolean processedResponse) { + this.processedResponse = processedResponse; + } // Flag set when this command's response is ready to be read from the server and cleared // after its response has been received, but not necessarily processed, up to and including @@ -7037,10 +7058,6 @@ boolean attentionPending() { // or by detaching. private volatile boolean readingResponse; - void setReadingResponse(boolean readingResponse) { - this.readingResponse = readingResponse; - } - final boolean readingResponse() { return readingResponse; } From 34cad3a1ee1aad8790a3b5924ea78eeb4eb3895e Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 10 Apr 2017 15:32:04 -0700 Subject: [PATCH 13/20] added tests for chars longer than 5000 --- .../jdbc/tvp/TVPResultSetCursorTest.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index 7d935dc1f..23137f9a2 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -156,13 +156,14 @@ public void testMultiplePreparedStatementAndResultSet() throws SQLException { pstmt2.execute(); verifyDestinationTableData(expectedBigDecimals.length * 3); - String sql = "insert into " + desTable + " values (?,?,?)"; + String sql = "insert into " + desTable + " values (?,?,?,?)"; Calendar calGMT = Calendar.getInstance(TimeZone.getTimeZone("GMT")); pstmt1 = (SQLServerPreparedStatement) conn.prepareStatement(sql); for (int i = 0; i < expectedBigDecimals.length; i++) { pstmt1.setBigDecimal(1, expectedBigDecimals[i]); pstmt1.setString(2, expectedStrings[i]); pstmt1.setTimestamp(3, expectedTimestamps[i], calGMT); + pstmt1.setString(4, expectedStrings[i]); pstmt1.execute(); } verifyDestinationTableData(expectedBigDecimals.length * 4); @@ -201,6 +202,8 @@ private static void verifyDestinationTableData(int expectedNumberOfRows) throws "Expected Value:" + expectedStrings[i % expectedArrayLength] + ", Actual Value: " + rs.getString(2)); assertTrue(rs.getString(3).equals(expectedTimestampStrings[i % expectedArrayLength]), "Expected Value:" + expectedTimestampStrings[i % expectedArrayLength] + ", Actual Value: " + rs.getString(3)); + assertTrue(rs.getString(4).trim().equals(expectedStrings[i % expectedArrayLength]), + "Expected Value:" + expectedStrings[i % expectedArrayLength] + ", Actual Value: " + rs.getString(4)); i++; } @@ -208,7 +211,7 @@ private static void verifyDestinationTableData(int expectedNumberOfRows) throws } private static void populateSourceTable() throws SQLException { - String sql = "insert into " + srcTable + " values (?,?,?)"; + String sql = "insert into " + srcTable + " values (?,?,?,?)"; Calendar calGMT = Calendar.getInstance(TimeZone.getTimeZone("GMT")); @@ -218,6 +221,7 @@ private static void populateSourceTable() throws SQLException { pstmt.setBigDecimal(1, expectedBigDecimals[i]); pstmt.setString(2, expectedStrings[i]); pstmt.setTimestamp(3, expectedTimestamps[i], calGMT); + pstmt.setString(4, expectedStrings[i]); pstmt.execute(); } } @@ -228,20 +232,21 @@ private static void dropTables() throws SQLException { } private static void createTables() throws SQLException { - String sql = "create table " + srcTable + " (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null);"; + String sql = "create table " + srcTable + " (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null, c4 char(7000));"; stmt.execute(sql); - sql = "create table " + desTable + " (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null);"; + sql = "create table " + desTable + " (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null, c4 char(7000));"; stmt.execute(sql); } private static void createTVPS() throws SQLException { - String TVPCreateCmd = "CREATE TYPE " + tvpName + " as table (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null)"; - stmt.executeUpdate(TVPCreateCmd); + String TVPCreateCmd = "CREATE TYPE " + tvpName + + " as table (c1 decimal(10,5) null, c2 nchar(50) null, c3 datetime2(7) null, c4 char(7000) null)"; + stmt.execute(TVPCreateCmd); } private static void dropTVPS() throws SQLException { - stmt.executeUpdate("IF EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = '" + tvpName + "') " + " drop type " + tvpName); + stmt.execute("IF EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = '" + tvpName + "') " + " drop type " + tvpName); } @AfterEach From 4a3435c0c18a3b316fb668d650ec3fd798b4192f Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 11 Apr 2017 15:07:36 -0700 Subject: [PATCH 14/20] fix issue with forward only cursor regarding to reading data from Result set --- .../com/microsoft/sqlserver/jdbc/IOBuffer.java | 15 ++++++++++----- .../sqlserver/jdbc/SQLServerResultSet.java | 4 ++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 71fc7aec7..1eb67e88e 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4547,8 +4547,9 @@ void writeTVPRows(TVP value) throws SQLServerException { // Therefore, we need to send TVP data row by row before fetching new row. if (TVPType.ResultSet == value.tvpType) { if ((null != value.sourceResultSet) && (value.sourceResultSet instanceof SQLServerResultSet)) { - SQLServerStatement src_stmt = (SQLServerStatement) ((SQLServerResultSet) value.sourceResultSet).getStatement(); - int resultSetServerCursorId = ((SQLServerResultSet) value.sourceResultSet).getServerCursorId(); + SQLServerResultSet sourceResultSet = (SQLServerResultSet) value.sourceResultSet; + SQLServerStatement src_stmt = (SQLServerStatement) sourceResultSet.getStatement(); + int resultSetServerCursorId = sourceResultSet.getServerCursorId(); if (con.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) { cachedTVPHeaders = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order()); @@ -4557,6 +4558,10 @@ void writeTVPRows(TVP value) throws SQLServerException { cachedCommand = this.command; tdsWritterCached = true; + + if (sourceResultSet.isForwardOnly()) { + sourceResultSet.setFetchSize(1); + } } } } @@ -7010,7 +7015,7 @@ final void log(Level level, // interrupt is ignored. private volatile boolean interruptsEnabled = false; - void setInterruptsEnabled(boolean interruptsEnabled) { + protected void setInterruptsEnabled(boolean interruptsEnabled) { this.interruptsEnabled = interruptsEnabled; } @@ -7030,7 +7035,7 @@ private boolean wasInterrupted() { // After the request is complete, the interrupting thread must send the attention signal. private volatile boolean requestComplete; - void setRequestComplete(boolean requestComplete) { + protected void setRequestComplete(boolean requestComplete) { this.requestComplete = requestComplete; } @@ -7048,7 +7053,7 @@ boolean attentionPending() { // ENVCHANGE notifications. private volatile boolean processedResponse; - void setProcessedResponse(boolean processedResponse) { + protected void setProcessedResponse(boolean processedResponse) { this.processedResponse = processedResponse; } diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java index cde46e195..43d632bf7 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java @@ -84,7 +84,7 @@ String getClassNameLogging() { private final int serverCursorId; - int getServerCursorId() { + protected int getServerCursorId() { return serverCursorId; } @@ -452,7 +452,7 @@ private void throwNotScrollable() throws SQLServerException { true); } - private boolean isForwardOnly() { + protected boolean isForwardOnly() { return TYPE_SS_DIRECT_FORWARD_ONLY == stmt.getSQLResultSetType() || TYPE_SS_SERVER_CURSOR_FORWARD_ONLY == stmt.getSQLResultSetType(); } From 9d270e38c398674ebb98801317021c3da3767471 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 11 Apr 2017 15:30:48 -0700 Subject: [PATCH 15/20] added tests to test long characters and cursor is a combination of ResultSet.TYPE_FORWARD_ONLY and ResultSet.CONCUR_UPDATABLE --- .../sqlserver/jdbc/tvp/TVPResultSetCursorTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index 23137f9a2..8269ebedc 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -54,6 +54,14 @@ public class TVPResultSetCursorTest extends AbstractTest { */ @Test public void testServerCursors() throws SQLException { + serverCursorsTest(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + serverCursorsTest(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); + serverCursorsTest(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); + serverCursorsTest(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + } + + private void serverCursorsTest(int resultSetType, + int resultSetConcurrency) throws SQLException { conn = DriverManager.getConnection(connectionString); stmt = conn.createStatement(); @@ -65,7 +73,7 @@ public void testServerCursors() throws SQLException { populateSourceTable(); - ResultSet rs = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE).executeQuery("select * from " + srcTable); + ResultSet rs = conn.createStatement(resultSetType, resultSetConcurrency).executeQuery("select * from " + srcTable); SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); pstmt.setStructured(1, tvpName, rs); From a6d29b1149689ffb10d96c5e13a07bb066c93186 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 18 Apr 2017 16:24:34 -0700 Subject: [PATCH 16/20] fix log --- src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 1eb67e88e..2092a6064 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4576,6 +4576,7 @@ void writeTVPRows(TVP value) throws SQLServerException { command = cachedCommand; stagingBuffer.clear(); + logBuffer.clear(); writeBytes(cachedTVPHeaders.array(), 0, cachedTVPHeaders.position()); } From ed9dce6feac3ff3f4d939f434a6369d2a8bc8690 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Wed, 19 Apr 2017 13:45:05 -0700 Subject: [PATCH 17/20] after sending a row, throw exception in case of errors --- .../java/com/microsoft/sqlserver/jdbc/IOBuffer.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 2092a6064..d126d29dc 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4789,15 +4789,22 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) currentColumn++; } - // send this row, read its response and reset command status + // send this row, read its response (throw exception in case of errors) and reset command status if (tdsWritterCached) { // TVP_END_TOKEN writeByte((byte) 0x00); writePacket(TDS.STATUS_BIT_EOM); - while (tdsChannel.getReader(command).readPacket()) - ; + TDSReader tdsReader = tdsChannel.getReader(command); + int tokenType = tdsReader.peekTokenType(); + + StreamError databaseError = new StreamError(); + databaseError.setFromTDS(tdsReader); + + if (TDS.TDS_ERR == tokenType) { + SQLServerException.makeFromDatabaseError(con, null, databaseError.getMessage(), databaseError, false); + } command.setInterruptsEnabled(true); command.setRequestComplete(false); From bd872d0c88cc1f23cc06018599a3580d0f90d19d Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Wed, 19 Apr 2017 14:16:42 -0700 Subject: [PATCH 18/20] fixed assertion error and added tests for invalid SP name and invalid TVP name --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 6 +- .../jdbc/tvp/TVPResultSetCursorTest.java | 154 ++++++++++++++++++ 2 files changed, 157 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index d126d29dc..efff14683 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4799,10 +4799,10 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) TDSReader tdsReader = tdsChannel.getReader(command); int tokenType = tdsReader.peekTokenType(); - StreamError databaseError = new StreamError(); - databaseError.setFromTDS(tdsReader); - if (TDS.TDS_ERR == tokenType) { + StreamError databaseError = new StreamError(); + databaseError.setFromTDS(tdsReader); + SQLServerException.makeFromDatabaseError(con, null, databaseError.getMessage(), databaseError, false); } diff --git a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java index 8269ebedc..1374dc5c9 100644 --- a/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java +++ b/src/test/java/com/microsoft/sqlserver/jdbc/tvp/TVPResultSetCursorTest.java @@ -25,6 +25,8 @@ import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import com.microsoft.sqlserver.jdbc.SQLServerCallableStatement; +import com.microsoft.sqlserver.jdbc.SQLServerException; import com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement; import com.microsoft.sqlserver.testframework.AbstractTest; import com.microsoft.sqlserver.testframework.Utils; @@ -44,6 +46,7 @@ public class TVPResultSetCursorTest extends AbstractTest { static String[] expectedTimestampStrings = {"2015-06-03 13:35:33.4610000", "2442-09-19 01:59:43.9990000", "2017-04-02 08:58:53.0000000"}; private static String tvpName = "TVPResultSetCursorTest_TVP"; + private static String procedureName = "TVPResultSetCursorTest_SP"; private static String srcTable = "TVPResultSetCursorTest_SourceTable"; private static String desTable = "TVPResultSetCursorTest_DestinationTable"; @@ -126,6 +129,146 @@ public void testSelectMethodSetToCursor() throws SQLException { } } + /** + * Test a previous failure when setting SelectMethod to cursor and using the same connection to create TVP, SP and result set. + * + * @throws SQLException + */ + @Test + public void testSelectMethodSetToCursorWithSP() throws SQLException { + Properties info = new Properties(); + info.setProperty("SelectMethod", "cursor"); + conn = DriverManager.getConnection(connectionString, info); + + stmt = conn.createStatement(); + + dropProcedure(); + dropTVPS(); + dropTables(); + + createTVPS(); + createTables(); + createPreocedure(); + + populateSourceTable(); + + ResultSet rs = conn.createStatement().executeQuery("select * from " + srcTable); + + final String sql = "{call " + procedureName + "(?)}"; + SQLServerCallableStatement pstmt = (SQLServerCallableStatement) conn.prepareCall(sql); + pstmt.setStructured(1, tvpName, rs); + + try { + pstmt.execute(); + + verifyDestinationTableData(expectedBigDecimals.length); + } + finally { + if (null != pstmt) { + pstmt.close(); + } + if (null != rs) { + rs.close(); + } + + dropProcedure(); + } + } + + /** + * Test exception when giving invalid TVP name + * + * @throws SQLException + */ + @Test + public void testInvalidTVPName() throws SQLException { + Properties info = new Properties(); + info.setProperty("SelectMethod", "cursor"); + conn = DriverManager.getConnection(connectionString, info); + + stmt = conn.createStatement(); + + dropTVPS(); + dropTables(); + + createTVPS(); + createTables(); + + populateSourceTable(); + + ResultSet rs = conn.createStatement().executeQuery("select * from " + srcTable); + + SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement("INSERT INTO " + desTable + " select * from ? ;"); + pstmt.setStructured(1, "invalid" + tvpName, rs); + + try { + pstmt.execute(); + } + catch (SQLServerException e) { + if (!e.getMessage().contains("Cannot find data type")) { + throw e; + } + } + finally { + if (null != pstmt) { + pstmt.close(); + } + if (null != rs) { + rs.close(); + } + } + } + + /** + * Test exception when giving invalid stored procedure name + * + * @throws SQLException + */ + @Test + public void testInvalidStoredProcedureName() throws SQLException { + Properties info = new Properties(); + info.setProperty("SelectMethod", "cursor"); + conn = DriverManager.getConnection(connectionString, info); + + stmt = conn.createStatement(); + + dropProcedure(); + dropTVPS(); + dropTables(); + + createTVPS(); + createTables(); + createPreocedure(); + + populateSourceTable(); + + ResultSet rs = conn.createStatement().executeQuery("select * from " + srcTable); + + final String sql = "{call invalid" + procedureName + "(?)}"; + SQLServerCallableStatement pstmt = (SQLServerCallableStatement) conn.prepareCall(sql); + pstmt.setStructured(1, tvpName, rs); + + try { + pstmt.execute(); + } + catch (SQLServerException e) { + if (!e.getMessage().contains("Could not find stored procedure")) { + throw e; + } + } + finally { + + if (null != pstmt) { + pstmt.close(); + } + if (null != rs) { + rs.close(); + } + + dropProcedure(); + } + } + /** * test with multiple prepared statements and result sets * @@ -257,6 +400,17 @@ private static void dropTVPS() throws SQLException { stmt.execute("IF EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = '" + tvpName + "') " + " drop type " + tvpName); } + private static void dropProcedure() throws SQLException { + Utils.dropProcedureIfExists(procedureName, stmt); + } + + private static void createPreocedure() throws SQLException { + String sql = "CREATE PROCEDURE " + procedureName + " @InputData " + tvpName + " READONLY " + " AS " + " BEGIN " + " INSERT INTO " + desTable + + " SELECT * FROM @InputData" + " END"; + + stmt.execute(sql); + } + @AfterEach private void terminateVariation() throws SQLException { if (null != conn) { From 98be43965041f570c498ed375c8e97d001f2fa1e Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Mon, 24 Apr 2017 17:44:22 -0700 Subject: [PATCH 19/20] update cached variables in synchronized way --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index efff14683..1e53e9e60 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4539,6 +4539,10 @@ void writeTVPRows(TVP value) throws SQLServerException { boolean tdsWritterCached = false; ByteBuffer cachedTVPHeaders = null; TDSCommand cachedCommand = null; + + boolean cachedRequestComplete = false; + boolean cachedInterruptsEnabled = false; + boolean cachedProcessedResponse = false; if (!value.isNull()) { @@ -4557,6 +4561,10 @@ void writeTVPRows(TVP value) throws SQLServerException { cachedCommand = this.command; + cachedRequestComplete = command.requestComplete; + cachedInterruptsEnabled = command.interruptsEnabled; + cachedProcessedResponse = command.processedResponse; + tdsWritterCached = true; if (sourceResultSet.isForwardOnly()) { @@ -4806,17 +4814,15 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) SQLServerException.makeFromDatabaseError(con, null, databaseError.getMessage(), databaseError, false); } - command.setInterruptsEnabled(true); - command.setRequestComplete(false); + command.interruptsEnabled = true; + command.requestComplete = false; } } } // reset command status which have been overwritten if (tdsWritterCached) { - command.setRequestComplete(false); - command.setInterruptsEnabled(true); - command.setProcessedResponse(false); + command.resetCachedFlags(cachedRequestComplete, cachedInterruptsEnabled, cachedProcessedResponse); } else { // TVP_END_TOKEN @@ -7021,11 +7027,7 @@ final void log(Level level, // received, indicating that it is no longer able to respond to interrupts. // If the command is interrupted after interrupts have been disabled, then the // interrupt is ignored. - private volatile boolean interruptsEnabled = false; - - protected void setInterruptsEnabled(boolean interruptsEnabled) { - this.interruptsEnabled = interruptsEnabled; - } + protected volatile boolean interruptsEnabled = false; // Flag set to indicate that an interrupt has happened. private volatile boolean wasInterrupted = false; @@ -7041,11 +7043,7 @@ private boolean wasInterrupted() { // If a command is interrupted before its request is complete, it is the executing // thread's responsibility to send the attention signal to the server if necessary. // After the request is complete, the interrupting thread must send the attention signal. - private volatile boolean requestComplete; - - protected void setRequestComplete(boolean requestComplete) { - this.requestComplete = requestComplete; - } + protected volatile boolean requestComplete; // Flag set when an attention signal has been sent to the server, indicating that a // TDS packet containing the attention ack message is to be expected in the response. @@ -7059,11 +7057,7 @@ boolean attentionPending() { // Flag set when this command's response has been processed. Until this flag is set, // there may be unprocessed information left in the response, such as transaction // ENVCHANGE notifications. - private volatile boolean processedResponse; - - protected void setProcessedResponse(boolean processedResponse) { - this.processedResponse = processedResponse; - } + protected volatile boolean processedResponse; // Flag set when this command's response is ready to be read from the server and cleared // after its response has been received, but not necessarily processed, up to and including @@ -7522,6 +7516,16 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException { return tdsReader; } + + protected void resetCachedFlags(boolean cachedRequestComplete, + boolean cachedInterruptsEnabled, + boolean cachedProcessedResponse) { + synchronized (interruptLock) { + this.requestComplete = cachedRequestComplete; + this.interruptsEnabled = cachedInterruptsEnabled; + this.processedResponse = cachedProcessedResponse; + } + } } /** From c23e4997067014259e77ec82131e2458cbdfa937 Mon Sep 17 00:00:00 2001 From: v-xiangs Date: Tue, 25 Apr 2017 11:03:41 -0700 Subject: [PATCH 20/20] use setters and getters --- .../microsoft/sqlserver/jdbc/IOBuffer.java | 60 +++++++++++++------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java index 1e53e9e60..439595e39 100644 --- a/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java +++ b/src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java @@ -4561,9 +4561,9 @@ void writeTVPRows(TVP value) throws SQLServerException { cachedCommand = this.command; - cachedRequestComplete = command.requestComplete; - cachedInterruptsEnabled = command.interruptsEnabled; - cachedProcessedResponse = command.processedResponse; + cachedRequestComplete = command.getRequestComplete(); + cachedInterruptsEnabled = command.getInterruptsEnabled(); + cachedProcessedResponse = command.getProcessedResponse(); tdsWritterCached = true; @@ -4814,15 +4814,17 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength) SQLServerException.makeFromDatabaseError(con, null, databaseError.getMessage(), databaseError, false); } - command.interruptsEnabled = true; - command.requestComplete = false; + command.setInterruptsEnabled(true); + command.setRequestComplete(false); } } } // reset command status which have been overwritten if (tdsWritterCached) { - command.resetCachedFlags(cachedRequestComplete, cachedInterruptsEnabled, cachedProcessedResponse); + command.setRequestComplete(cachedRequestComplete); + command.setInterruptsEnabled(cachedInterruptsEnabled); + command.setProcessedResponse(cachedProcessedResponse); } else { // TVP_END_TOKEN @@ -7027,7 +7029,17 @@ final void log(Level level, // received, indicating that it is no longer able to respond to interrupts. // If the command is interrupted after interrupts have been disabled, then the // interrupt is ignored. - protected volatile boolean interruptsEnabled = false; + private volatile boolean interruptsEnabled = false; + + protected boolean getInterruptsEnabled() { + return interruptsEnabled; + } + + protected void setInterruptsEnabled(boolean interruptsEnabled) { + synchronized (interruptLock) { + this.interruptsEnabled = interruptsEnabled; + } + } // Flag set to indicate that an interrupt has happened. private volatile boolean wasInterrupted = false; @@ -7043,7 +7055,17 @@ private boolean wasInterrupted() { // If a command is interrupted before its request is complete, it is the executing // thread's responsibility to send the attention signal to the server if necessary. // After the request is complete, the interrupting thread must send the attention signal. - protected volatile boolean requestComplete; + private volatile boolean requestComplete; + + protected boolean getRequestComplete() { + return requestComplete; + } + + protected void setRequestComplete(boolean requestComplete) { + synchronized (interruptLock) { + this.requestComplete = requestComplete; + } + } // Flag set when an attention signal has been sent to the server, indicating that a // TDS packet containing the attention ack message is to be expected in the response. @@ -7057,7 +7079,17 @@ boolean attentionPending() { // Flag set when this command's response has been processed. Until this flag is set, // there may be unprocessed information left in the response, such as transaction // ENVCHANGE notifications. - protected volatile boolean processedResponse; + private volatile boolean processedResponse; + + protected boolean getProcessedResponse() { + return processedResponse; + } + + protected void setProcessedResponse(boolean processedResponse) { + synchronized (interruptLock) { + this.processedResponse = processedResponse; + } + } // Flag set when this command's response is ready to be read from the server and cleared // after its response has been received, but not necessarily processed, up to and including @@ -7516,16 +7548,6 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException { return tdsReader; } - - protected void resetCachedFlags(boolean cachedRequestComplete, - boolean cachedInterruptsEnabled, - boolean cachedProcessedResponse) { - synchronized (interruptLock) { - this.requestComplete = cachedRequestComplete; - this.interruptsEnabled = cachedInterruptsEnabled; - this.processedResponse = cachedProcessedResponse; - } - } } /**