Skip to content

Commit

Permalink
Don't abort the cursor when we're at the end of JDBC resultset
Browse files Browse the repository at this point in the history
The JDBC `abort` operation is a heavy one - so heavy, in fact, that it
expects a separate `Executor` to offload it to. So we'd like to not call
it if we can avoid it.

The original rationale was that some JDBC drivers insist on draining the
resultset on `close` when there are still results to be returned, which
is a problem when we abort a query before it finishes execution. But we
don't have to do it unconditionally.

This commit short-circuits the abort call when the
`ResultSet#isAfterLast` returns `true`. This is an optional method and
it's not guaranteed that all al vendors implement it, though. So we push
this check to driver-specific implementations wherever it is necessary
and we know it's supported.
  • Loading branch information
ksobolew authored and kokosing committed Nov 16, 2021
1 parent c63e06f commit e4906fd
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -202,10 +203,10 @@ public Connection getConnection(ConnectorSession session, JdbcSplit split)
}

@Override
public void abortReadConnection(Connection connection)
public void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
{
delegate.abortReadConnection(connection);
delegate.abortReadConnection(connection, resultSet);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -132,10 +133,10 @@ public Connection getConnection(ConnectorSession session, JdbcSplit split)
}

@Override
public void abortReadConnection(Connection connection)
public void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
{
delegate().abortReadConnection(connection);
delegate().abortReadConnection(connection, resultSet);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,7 +80,7 @@ default Optional<JdbcExpression> implementAggregation(ConnectorSession session,
Connection getConnection(ConnectorSession session, JdbcSplit split)
throws SQLException;

default void abortReadConnection(Connection connection)
default void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
{
// most drivers do not need this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ public void close()
// statement already closed or cancel is not supported
}
}
if (connection != null) {
jdbcClient.abortReadConnection(connection);
if (connection != null && resultSet != null) {
jdbcClient.abortReadConnection(connection, resultSet);
}
}
catch (SQLException | RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -152,10 +153,10 @@ public Connection getConnection(ConnectorSession session, JdbcSplit split)
}

@Override
public void abortReadConnection(Connection connection)
public void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
{
stats.getAbortReadConnection().wrap(() -> delegate().abortReadConnection(connection));
stats.getAbortReadConnection().wrap(() -> delegate().abortReadConnection(connection, resultSet));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ protected boolean filterSchema(String schemaName)
}

@Override
public void abortReadConnection(Connection connection)
public void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
{
// Abort connection before closing. Without this, the MySQL driver
// attempts to drain the connection by reading all the results.
connection.abort(directExecutor());
if (!resultSet.isAfterLast()) {
// Abort connection before closing. Without this, the MySQL driver
// attempts to drain the connection by reading all the results.
connection.abort(directExecutor());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant;
Expand Down Expand Up @@ -634,12 +635,14 @@ public Map<String, Object> getTableProperties(ConnectorSession session, JdbcTabl
}

@Override
public void abortReadConnection(Connection connection)
public void abortReadConnection(Connection connection, ResultSet resultSet)
throws SQLException
{
// Abort connection before closing. Without this, the SQL Server driver
// attempts to drain the connection by reading all the results.
connection.abort(directExecutor());
if (!resultSet.isAfterLast()) {
// Abort connection before closing. Without this, the SQL Server driver
// attempts to drain the connection by reading all the results.
connection.abort(directExecutor());
}
}

@Override
Expand Down

0 comments on commit e4906fd

Please sign in to comment.