From e4906fd50b66c67608a5d13ac700f14460a1e453 Mon Sep 17 00:00:00 2001 From: Krzysztof Sobolewski Date: Thu, 28 Oct 2021 14:23:35 +0200 Subject: [PATCH] Don't abort the cursor when we're at the end of JDBC resultset The JDBC `abort` operation is a heavy one - so heavy, in fact, that it expects a separate `Executor` to offload it to. So we'd like to not call it if we can avoid it. The original rationale was that some JDBC drivers insist on draining the resultset on `close` when there are still results to be returned, which is a problem when we abort a query before it finishes execution. But we don't have to do it unconditionally. This commit short-circuits the abort call when the `ResultSet#isAfterLast` returns `true`. This is an optional method and it's not guaranteed that all al vendors implement it, though. So we push this check to driver-specific implementations wherever it is necessary and we know it's supported. --- .../java/io/trino/plugin/jdbc/CachingJdbcClient.java | 5 +++-- .../io/trino/plugin/jdbc/ForwardingJdbcClient.java | 5 +++-- .../main/java/io/trino/plugin/jdbc/JdbcClient.java | 3 ++- .../java/io/trino/plugin/jdbc/JdbcRecordCursor.java | 4 ++-- .../plugin/jdbc/jmx/StatisticsAwareJdbcClient.java | 5 +++-- .../main/java/io/trino/plugin/mysql/MySqlClient.java | 10 ++++++---- .../io/trino/plugin/sqlserver/SqlServerClient.java | 11 +++++++---- 7 files changed, 26 insertions(+), 17 deletions(-) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 65c68921737a..73c33845e46c 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -46,6 +46,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -202,10 +203,10 @@ public Connection getConnection(ConnectorSession session, JdbcSplit split) } @Override - public void abortReadConnection(Connection connection) + public void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException { - delegate.abortReadConnection(connection); + delegate.abortReadConnection(connection, resultSet); } @Override diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java index 04ffc5633cab..0ff945fd177b 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java @@ -30,6 +30,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -132,10 +133,10 @@ public Connection getConnection(ConnectorSession session, JdbcSplit split) } @Override - public void abortReadConnection(Connection connection) + public void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException { - delegate().abortReadConnection(connection); + delegate().abortReadConnection(connection, resultSet); } @Override diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java index e2b520039af8..8b827c2c6692 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java @@ -31,6 +31,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -79,7 +80,7 @@ default Optional implementAggregation(ConnectorSession session, Connection getConnection(ConnectorSession session, JdbcSplit split) throws SQLException; - default void abortReadConnection(Connection connection) + default void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException { // most drivers do not need this diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordCursor.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordCursor.java index aa1a94b1a793..48396bdd894a 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordCursor.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordCursor.java @@ -276,8 +276,8 @@ public void close() // statement already closed or cancel is not supported } } - if (connection != null) { - jdbcClient.abortReadConnection(connection); + if (connection != null && resultSet != null) { + jdbcClient.abortReadConnection(connection, resultSet); } } catch (SQLException | RuntimeException e) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java index 55714be31736..c3ad0824535c 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java @@ -46,6 +46,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -152,10 +153,10 @@ public Connection getConnection(ConnectorSession session, JdbcSplit split) } @Override - public void abortReadConnection(Connection connection) + public void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException { - stats.getAbortReadConnection().wrap(() -> delegate().abortReadConnection(connection)); + stats.getAbortReadConnection().wrap(() -> delegate().abortReadConnection(connection, resultSet)); } @Override diff --git a/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java b/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java index 2093cddc2885..ea2b12da6717 100644 --- a/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java +++ b/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java @@ -228,12 +228,14 @@ protected boolean filterSchema(String schemaName) } @Override - public void abortReadConnection(Connection connection) + public void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException { - // Abort connection before closing. Without this, the MySQL driver - // attempts to drain the connection by reading all the results. - connection.abort(directExecutor()); + if (!resultSet.isAfterLast()) { + // Abort connection before closing. Without this, the MySQL driver + // attempts to drain the connection by reading all the results. + connection.abort(directExecutor()); + } } @Override diff --git a/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java b/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java index 8dbfd691f74b..13d2090b4663 100644 --- a/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java +++ b/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java @@ -71,6 +71,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.time.Instant; @@ -634,12 +635,14 @@ public Map getTableProperties(ConnectorSession session, JdbcTabl } @Override - public void abortReadConnection(Connection connection) + public void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException { - // Abort connection before closing. Without this, the SQL Server driver - // attempts to drain the connection by reading all the results. - connection.abort(directExecutor()); + if (!resultSet.isAfterLast()) { + // Abort connection before closing. Without this, the SQL Server driver + // attempts to drain the connection by reading all the results. + connection.abort(directExecutor()); + } } @Override