diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/util/Closables.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/util/Closables.java new file mode 100644 index 000000000000..9c4ca8c163bc --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/util/Closables.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.util; + +import static java.util.Objects.requireNonNull; + +public final class Closables +{ + private Closables() {} + + public static T closeAllSuppress(T rootCause, AutoCloseable... closeables) + { + requireNonNull(rootCause, "rootCause is null"); + requireNonNull(closeables, "closeables is null"); + for (AutoCloseable closeable : closeables) { + try { + if (closeable != null) { + closeable.close(); + } + } + catch (Throwable e) { + // Self-suppression not permitted + if (rootCause != e) { + rootCause.addSuppressed(e); + } + } + } + return rootCause; + } +} diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/util/TestClosables.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/util/TestClosables.java new file mode 100644 index 000000000000..ec6d20fe3096 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/util/TestClosables.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.util; + +import org.testng.annotations.Test; + +import java.util.Optional; + +import static io.trino.plugin.base.util.Closables.closeAllSuppress; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertTrue; + +public class TestClosables +{ + @Test + public void testCloseAllSuppressNonThrowing() + { + RuntimeException rootException = new RuntimeException("root"); + TestClosable closable = new TestClosable(Optional.empty()); + closeAllSuppress(rootException, closable); + assertTrue(closable.isClosed()); + assertThat(rootException.getSuppressed()).isEmpty(); + } + + @Test + public void testCloseAllSuppressThrowingOther() + { + RuntimeException rootException = new RuntimeException("root"); + RuntimeException closeException = new RuntimeException("close"); + TestClosable closable = new TestClosable(Optional.of(closeException)); + closeAllSuppress(rootException, closable); + assertTrue(closable.isClosed()); + assertThat(rootException.getSuppressed()).containsExactly(closeException); + } + + @Test + public void testCloseAllSuppressThrowingRoot() + { + RuntimeException rootException = new RuntimeException("root"); + TestClosable closable = new TestClosable(Optional.of(rootException)); + closeAllSuppress(rootException, closable); + assertTrue(closable.isClosed()); + assertThat(rootException.getSuppressed()).isEmpty(); + } + + @Test + public void testCloseAllSuppressNullClosable() + { + RuntimeException rootException = new RuntimeException("root"); + closeAllSuppress(rootException, (AutoCloseable) null); + assertThat(rootException.getSuppressed()).isEmpty(); + } + + @Test + public void testCloseAllSuppressMultipleClosables() + { + RuntimeException rootException = new RuntimeException("root"); + RuntimeException closeException1 = new RuntimeException("close"); + RuntimeException closeException2 = new RuntimeException("close2"); + TestClosable closable1 = new TestClosable(Optional.of(closeException1)); + TestClosable closable2 = new TestClosable(Optional.of(closeException2)); + TestClosable closable3 = new TestClosable(Optional.empty()); // non throwing + TestClosable closable4 = new TestClosable(Optional.of(rootException)); // throwing root + closeAllSuppress(rootException, closable1, closable2, closable3, closable4, null); + assertTrue(closable1.isClosed()); + assertTrue(closable2.isClosed()); + assertTrue(closable3.isClosed()); + assertTrue(closable4.isClosed()); + assertThat(rootException.getSuppressed()).containsExactly(closeException1, closeException2); + } + + private static class TestClosable + implements AutoCloseable + { + private final Optional closeException; + private boolean closed; + + public TestClosable(Optional closeException) + { + this.closeException = requireNonNull(closeException, "closeException is null"); + } + + @Override + public void close() + throws Exception + { + closed = true; + if (closeException.isPresent()) { + throw closeException.get(); + } + } + + public boolean isClosed() + { + return closed; + } + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java index f6e1abe5eea5..847f0a7ce93c 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java @@ -33,6 +33,7 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR; import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.getWriteBatchSize; @@ -68,7 +69,7 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc connection.setAutoCommit(false); } catch (SQLException e) { - closeWithSuppression(connection, e); + closeAllSuppress(e, connection); throw new TrinoException(JDBC_ERROR, e); } @@ -102,7 +103,7 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc statement = connection.prepareStatement(jdbcClient.buildInsertSql(handle, columnWriters)); } catch (SQLException e) { - closeWithSuppression(connection, e); + closeAllSuppress(e, connection); throw new TrinoException(JDBC_ERROR, e); } @@ -212,18 +213,4 @@ public void abort() throw new TrinoException(JDBC_ERROR, e); } } - - @SuppressWarnings("ObjectEquality") - private static void closeWithSuppression(Connection connection, Throwable throwable) - { - try { - connection.close(); - } - catch (Throwable t) { - // Self-suppression not permitted - if (throwable != t) { - throwable.addSuppressed(t); - } - } - } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursor.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursor.java index 0e24e4d173b1..8ac0b6a7cb22 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursor.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursor.java @@ -59,10 +59,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.trino.plugin.base.type.TrinoTimestampEncoderFactory.createTimestampEncoder; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; -import static io.trino.plugin.hive.util.HiveUtil.closeWithSuppression; import static io.trino.plugin.hive.util.HiveUtil.getDeserializer; import static io.trino.plugin.hive.util.HiveUtil.getTableObjectInspector; import static io.trino.plugin.hive.util.HiveUtil.isStructuralType; @@ -226,7 +226,7 @@ public boolean advanceNextPosition() return true; } catch (IOException | SerDeException | RuntimeException e) { - closeWithSuppression(this, e); + closeAllSuppress(e, this); if (e instanceof TextLineLengthLimitExceededException) { throw new TrinoException(HIVE_BAD_DATA, "Line too long in text file: " + path, e); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java index 02c6f2caa950..78ff27131f15 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java @@ -62,6 +62,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveColumnHandle.isRowIdColumnHandle; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; @@ -243,11 +244,11 @@ public Page getNextPage() return page; } catch (TrinoException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw e; } catch (RuntimeException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw new TrinoException(HIVE_CURSOR_ERROR, e); } } @@ -281,20 +282,6 @@ public Metrics getMetrics() return delegate.getMetrics(); } - protected void closeWithSuppression(Throwable throwable) - { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (throwable != e) { - throwable.addSuppressed(e); - } - } - } - public ConnectorPageSource getPageSource() { return delegate; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java index b4fe8807fb76..808ece55d05f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java @@ -50,6 +50,7 @@ import static io.trino.orc.OrcReader.MAX_BATCH_SIZE; import static io.trino.orc.OrcReader.createOrcReader; import static io.trino.orc.OrcReader.fullyProjectedLayout; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA; import static io.trino.plugin.hive.acid.AcidSchema.ACID_COLUMN_BUCKET; @@ -190,7 +191,7 @@ public Page getNextPage() return page; } catch (IOException | RuntimeException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw handleException(orcDataSource.getId(), e); } } @@ -227,20 +228,6 @@ public long getSystemMemoryUsage() return systemMemoryContext.getBytes(); } - private void closeWithSuppression(Throwable throwable) - { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (throwable != e) { - throwable.addSuppressed(e); - } - } - } - private static String openError(Throwable t, Path path) { return format("Error opening Hive delete delta file %s: %s", path, t.getMessage()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java index 94d2c90a1e44..2e188bd3260d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java @@ -46,6 +46,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; import static io.trino.plugin.hive.HiveUpdatablePageSource.BUCKET_CHANNEL; @@ -150,7 +151,7 @@ public Page getNextPage() } } catch (IOException | RuntimeException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw handleException(orcDataSource.getId(), e); } @@ -244,20 +245,6 @@ public long getSystemMemoryUsage() return systemMemoryContext.getBytes(); } - private void closeWithSuppression(Throwable throwable) - { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (throwable != e) { - throwable.addSuppressed(e); - } - } - } - public interface ColumnAdaptation { Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java index c97266386459..e4d921793794 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java @@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; import static java.lang.String.format; @@ -157,29 +158,15 @@ public Page getNextPage() return new Page(batchSize, blocks); } catch (TrinoException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw e; } catch (RuntimeException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw new TrinoException(HIVE_CURSOR_ERROR, e); } } - private void closeWithSuppression(Throwable throwable) - { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (e != throwable) { - throwable.addSuppressed(e); - } - } - } - @Override public void close() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSource.java index dee08b8f58e6..074ca42f8b65 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSource.java @@ -34,6 +34,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; import static java.lang.String.format; @@ -138,15 +139,15 @@ public Page getNextPage() return new Page(currentPageSize, blocks); } catch (TrinoException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw e; } catch (RcFileCorruptionException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw new TrinoException(HIVE_BAD_DATA, format("Corrupted RC file: %s", rcFileReader.getId()), e); } catch (IOException | RuntimeException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw new TrinoException(HIVE_CURSOR_ERROR, format("Failed to read RC file: %s", rcFileReader.getId()), e); } } @@ -179,19 +180,6 @@ public long getSystemMemoryUsage() return GUESSED_MEMORY_USAGE; } - private void closeWithSuppression(Throwable throwable) - { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (Exception e) { - if (e != throwable) { - throwable.addSuppressed(e); - } - } - } - private Block createBlock(int currentPageSize, int fieldId) { int hiveColumnIndex = hiveColumnIndexes[fieldId]; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java index 02e4a8910433..7ffbc8be8487 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java @@ -36,7 +36,6 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.spi.ErrorCodeSupplier; import io.trino.spi.TrinoException; -import io.trino.spi.connector.RecordCursor; import io.trino.spi.predicate.NullableValue; import io.trino.spi.type.ArrayType; import io.trino.spi.type.CharType; @@ -164,7 +163,6 @@ import static java.math.BigDecimal.ROUND_UNNECESSARY; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Locale.ENGLISH; -import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; import static org.apache.hadoop.hive.common.FileUtils.unescapePathName; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; @@ -1024,21 +1022,6 @@ else if (type.equals(VarbinaryType.VARBINARY)) { throw new TrinoException(NOT_SUPPORTED, format("Unsupported column type %s for prefilled column: %s", type.getDisplayName(), name)); } - public static void closeWithSuppression(RecordCursor recordCursor, Throwable throwable) - { - requireNonNull(recordCursor, "recordCursor is null"); - requireNonNull(throwable, "throwable is null"); - try { - recordCursor.close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (throwable != e) { - throwable.addSuppressed(e); - } - } - } - public static List extractStructFieldTypes(HiveType hiveType) { return ((StructTypeInfo) hiveType.getTypeInfo()).getAllStructFieldTypeInfos().stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index d2915188412c..586e954adf92 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -30,6 +30,7 @@ import java.util.OptionalLong; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static java.util.Objects.requireNonNull; @@ -153,15 +154,6 @@ public long getSystemMemoryUsage() protected void closeWithSuppression(Throwable throwable) { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (throwable != e) { - throwable.addSuppressed(e); - } - } + closeAllSuppress(throwable, this); } } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/RaptorPageSource.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/RaptorPageSource.java index f363f4827d32..0ab12ea9a2e3 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/RaptorPageSource.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/RaptorPageSource.java @@ -39,6 +39,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.SHARD_UUID_COLUMN_TYPE; import static io.trino.plugin.raptor.legacy.RaptorErrorCode.RAPTOR_ERROR; import static io.trino.spi.type.BigintType.BIGINT; @@ -104,7 +105,7 @@ public Page getNextPage() page = recordReader.nextPage(); } catch (IOException | RuntimeException e) { - closeWithSuppression(e); + closeAllSuppress(e, this); throw handleException(e); } @@ -175,20 +176,6 @@ public long getSystemMemoryUsage() return systemMemoryContext.getBytes(); } - private void closeWithSuppression(Throwable throwable) - { - requireNonNull(throwable, "throwable is null"); - try { - close(); - } - catch (RuntimeException e) { - // Self-suppression not permitted - if (throwable != e) { - throwable.addSuppressed(e); - } - } - } - public interface ColumnAdaptation { Block block(Page sourcePage, long filePosition);