Skip to content

Commit

Permalink
Merge pull request #234 from v-xiangs/TVP-Server-Cursor-Fix
Browse files Browse the repository at this point in the history
Tvp server cursor fix
  • Loading branch information
xiangyushawn authored Apr 25, 2017
2 parents cf732fe + c23e499 commit 18a9265
Show file tree
Hide file tree
Showing 5 changed files with 539 additions and 15 deletions.
113 changes: 110 additions & 3 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4525,12 +4525,59 @@ void writeTVP(TVP value) throws SQLServerException {
void writeTVPRows(TVP value) throws SQLServerException {
boolean isShortValue, isNull;
int dataLength;


boolean tdsWritterCached = false;
ByteBuffer cachedTVPHeaders = null;
TDSCommand cachedCommand = null;

boolean cachedRequestComplete = false;
boolean cachedInterruptsEnabled = false;
boolean cachedProcessedResponse = false;

if (!value.isNull()) {

// If the preparedStatement and the ResultSet are created by the same connection, and TVP is set with ResultSet and Server Cursor
// is used, the tdsWriter of the calling preparedStatement is overwritten by the SQLServerResultSet#next() method when fetching new rows.
// Therefore, we need to send TVP data row by row before fetching new row.
if (TVPType.ResultSet == value.tvpType) {
if ((null != value.sourceResultSet) && (value.sourceResultSet instanceof SQLServerResultSet)) {
SQLServerResultSet sourceResultSet = (SQLServerResultSet) value.sourceResultSet;
SQLServerStatement src_stmt = (SQLServerStatement) sourceResultSet.getStatement();
int resultSetServerCursorId = sourceResultSet.getServerCursorId();

if (con.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) {
cachedTVPHeaders = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order());
cachedTVPHeaders.put(stagingBuffer.array(), 0, stagingBuffer.position());

cachedCommand = this.command;

cachedRequestComplete = command.getRequestComplete();
cachedInterruptsEnabled = command.getInterruptsEnabled();
cachedProcessedResponse = command.getProcessedResponse();

tdsWritterCached = true;

if (sourceResultSet.isForwardOnly()) {
sourceResultSet.setFetchSize(1);
}
}
}
}

Map<Integer, SQLServerMetaData> columnMetadata = value.getColumnMetadata();
Iterator<Entry<Integer, SQLServerMetaData>> columnsIterator;

while (value.next()) {

// restore command and TDS header, which have been overwritten by value.next()
if (tdsWritterCached) {
command = cachedCommand;

stagingBuffer.clear();
logBuffer.clear();
writeBytes(cachedTVPHeaders.array(), 0, cachedTVPHeaders.position());
}

Object[] rowData = value.getRowData();

// ROW
Expand Down Expand Up @@ -4745,10 +4792,40 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength)
}
currentColumn++;
}

// send this row, read its response (throw exception in case of errors) and reset command status
if (tdsWritterCached) {
// TVP_END_TOKEN
writeByte((byte) 0x00);

writePacket(TDS.STATUS_BIT_EOM);

TDSReader tdsReader = tdsChannel.getReader(command);
int tokenType = tdsReader.peekTokenType();

if (TDS.TDS_ERR == tokenType) {
StreamError databaseError = new StreamError();
databaseError.setFromTDS(tdsReader);

SQLServerException.makeFromDatabaseError(con, null, databaseError.getMessage(), databaseError, false);
}

command.setInterruptsEnabled(true);
command.setRequestComplete(false);
}
}
}
// TVP_END_TOKEN
writeByte((byte) 0x00);

// reset command status which have been overwritten
if (tdsWritterCached) {
command.setRequestComplete(cachedRequestComplete);
command.setInterruptsEnabled(cachedInterruptsEnabled);
command.setProcessedResponse(cachedProcessedResponse);
}
else {
// TVP_END_TOKEN
writeByte((byte) 0x00);
}
}

private static byte[] toByteArray(String s) {
Expand Down Expand Up @@ -6950,6 +7027,16 @@ final void log(Level level,
// interrupt is ignored.
private volatile boolean interruptsEnabled = false;

protected boolean getInterruptsEnabled() {
return interruptsEnabled;
}

protected void setInterruptsEnabled(boolean interruptsEnabled) {
synchronized (interruptLock) {
this.interruptsEnabled = interruptsEnabled;
}
}

// Flag set to indicate that an interrupt has happened.
private volatile boolean wasInterrupted = false;

Expand All @@ -6966,6 +7053,16 @@ private boolean wasInterrupted() {
// After the request is complete, the interrupting thread must send the attention signal.
private volatile boolean requestComplete;

protected boolean getRequestComplete() {
return requestComplete;
}

protected void setRequestComplete(boolean requestComplete) {
synchronized (interruptLock) {
this.requestComplete = requestComplete;
}
}

// Flag set when an attention signal has been sent to the server, indicating that a
// TDS packet containing the attention ack message is to be expected in the response.
// This flag is cleared after the attention ack message has been received and processed.
Expand All @@ -6980,6 +7077,16 @@ boolean attentionPending() {
// ENVCHANGE notifications.
private volatile boolean processedResponse;

protected boolean getProcessedResponse() {
return processedResponse;
}

protected void setProcessedResponse(boolean processedResponse) {
synchronized (interruptLock) {
this.processedResponse = processedResponse;
}
}

// Flag set when this command's response is ready to be read from the server and cleared
// after its response has been received, but not necessarily processed, up to and including
// any attention ack. The command's response is read either on demand as it is processed,
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/com/microsoft/sqlserver/jdbc/Parameter.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,6 @@ else if (value instanceof SQLServerDataTable) {
tvpValue = new TVP(tvpName, (SQLServerDataTable) value);
}
else if (value instanceof ResultSet) {
// if ResultSet and PreparedStatemet/CallableStatement are created from same connection object
// with property SelectMethod=cursor, TVP is not supported
if (con.getSelectMethod().equalsIgnoreCase("cursor") && (value instanceof SQLServerResultSet)) {
SQLServerStatement stmt = (SQLServerStatement) ((SQLServerResultSet) value).getStatement();

if (con.equals(stmt.connection)) {
throw new SQLServerException(SQLServerException.getErrString("R_invalidServerCursorForTVP"), null);
}
}

tvpValue = new TVP(tvpName, (ResultSet) value);
}
else if (value instanceof ISQLServerDataRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ protected Object[][] getContents() {
{"R_invalidKeyStoreFile", "Cannot parse \"{0}\". Either the file format is not valid or the password is not correct."}, // for JKS/PKCS
{"R_invalidCEKCacheTtl", "Invalid column encryption key cache time-to-live specified. The columnEncryptionKeyCacheTtl value cannot be negative and timeUnit can only be DAYS, HOURS, MINUTES or SECONDS."},
{"R_sendTimeAsDateTimeForAE", "Use sendTimeAsDateTime=false with Always Encrypted."},
{"R_invalidServerCursorForTVP" , "Use different Connection for source ResultSet and prepared query, if selectMethod is set to cursor for Table-Valued Parameter."},
{"R_TVPnotWorkWithSetObjectResultSet" , "setObject() with ResultSet is not supported for Table-Valued Parameter. Please use setStructured()"},
{"R_invalidQueryTimeout", "The queryTimeout {0} is not valid."},
{"R_invalidSocketTimeout", "The socketTimeout {0} is not valid."},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ String getClassNameLogging() {
private boolean isClosed = false;

private final int serverCursorId;

protected int getServerCursorId() {
return serverCursorId;
}

/** the intended fetch direction to optimize cursor performance */
private int fetchDirection;
Expand Down Expand Up @@ -444,7 +448,7 @@ private void throwNotScrollable() throws SQLServerException {
true);
}

private boolean isForwardOnly() {
protected boolean isForwardOnly() {
return TYPE_SS_DIRECT_FORWARD_ONLY == stmt.getSQLResultSetType() || TYPE_SS_SERVER_CURSOR_FORWARD_ONLY == stmt.getSQLResultSetType();
}

Expand Down
Loading

0 comments on commit 18a9265

Please sign in to comment.