From 20a38b0fa78ca41a1a7e992ed7286de210cc0cde Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Fri, 10 Dec 2021 09:38:51 +0100 Subject: [PATCH] Fix inserting into transactional table when task_writer_count > 1 fixes: https://github.com/trinodb/trino/issues/9149 --- .../StreamPropertyDerivations.java | 2 - .../plugin/hive/TestHiveAlluxioMetastore.java | 14 +++++ .../io/trino/plugin/hive/HivePageSink.java | 7 ++- .../plugin/hive/HivePageSinkProvider.java | 1 + .../trino/plugin/hive/AbstractTestHive.java | 63 ++++++++++++++++--- .../hive/TestHiveTransactionalTable.java | 54 ++++++++++++++++ 6 files changed, 131 insertions(+), 10 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java index 2a04e2f95f1d..ab85e1ed2ca7 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java @@ -691,8 +691,6 @@ private StreamProperties( checkArgument(distribution != SINGLE || this.partitioningColumns.equals(Optional.of(ImmutableList.of())), "Single stream must be partitioned on empty set"); - checkArgument(distribution == SINGLE || !this.partitioningColumns.equals(Optional.of(ImmutableList.of())), - "Multiple streams must not be partitioned on empty set"); this.ordered = ordered; checkArgument(!ordered || distribution == SINGLE, "Ordered must be a single stream"); diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java index 55f6723dc9ff..6b159364bcd1 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java @@ -352,4 +352,18 @@ public void testNewDirectoryPermissions() { // Alluxio metastore does not support create operations } + + @Override + public void testInsertBucketedTransactionalTableLayout() + throws Exception + { + // Alluxio metastore does not support insert/update/delete operations + } + + @Override + public void testInsertPartitionedBucketedTransactionalTableLayout() + throws Exception + { + // Alluxio metastore does not support insert/update/delete operations + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index 678ebcd085e8..425282280018 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -68,6 +68,7 @@ public class HivePageSink private final HiveWriterFactory writerFactory; + private final boolean isTransactional; private final int[] dataColumnInputIndex; // ordinal of columns (not counting sample weight column) private final int[] partitionColumnsInputIndex; // ordinal of columns (not counting sample weight column) @@ -98,6 +99,7 @@ public class HivePageSink public HivePageSink( HiveWriterFactory writerFactory, List inputColumns, + boolean isTransactional, Optional bucketProperty, PageIndexerFactory pageIndexerFactory, HdfsEnvironment hdfsEnvironment, @@ -112,6 +114,7 @@ public HivePageSink( requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); + this.isTransactional = isTransactional; this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.maxOpenWriters = maxOpenWriters; this.writeVerificationExecutor = requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null"); @@ -361,7 +364,9 @@ private int[] getWriterIndexes(Page page) HiveWriter writer = writers.get(writerIndex); if (writer != null) { // if current file not too big continue with the current writer - if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { + // for transactional tables we don't want to split output files because there is an explicit or implicit bucketing + // and file names have no random component (e.g. bucket_00000) + if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { continue; } // close current writer diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java index d632c2a13c3a..29b9736b6251 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java @@ -173,6 +173,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean return new HivePageSink( writerFactory, handle.getInputColumns(), + handle.isTransactional(), handle.getBucketProperty(), pageIndexerFactory, hdfsEnvironment, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 91e77e010477..79da1f6a6064 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -171,6 +171,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.builder; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; @@ -227,6 +228,7 @@ import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY; import static io.trino.plugin.hive.HiveTableProperties.SORTED_BY_PROPERTY; import static io.trino.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY; +import static io.trino.plugin.hive.HiveTableProperties.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTableRedirectionsProvider.NO_REDIRECTIONS; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.SESSION; @@ -5094,10 +5096,27 @@ protected static List filterNonHiddenColumnMetadata(Collection columns, List partitionColumns) throws Exception { - createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty()); + createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty(), false); } - private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List columns, List partitionColumns, Optional bucketProperty) + private void createEmptyTable( + SchemaTableName schemaTableName, + HiveStorageFormat hiveStorageFormat, + List columns, + List partitionColumns, + Optional bucketProperty) + throws Exception + { + createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, bucketProperty, false); + } + + protected void createEmptyTable( + SchemaTableName schemaTableName, + HiveStorageFormat hiveStorageFormat, + List columns, + List partitionColumns, + Optional bucketProperty, + boolean isTransactional) throws Exception { Path targetPath; @@ -5113,14 +5132,18 @@ private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, Optional.empty()); targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath(); + ImmutableMap.Builder tableParamBuilder = ImmutableMap.builder() + .put(PRESTO_VERSION_NAME, TEST_SERVER_VERSION) + .put(PRESTO_QUERY_ID_NAME, session.getQueryId()); + if (isTransactional) { + tableParamBuilder.put(TRANSACTIONAL, "true"); + } Table.Builder tableBuilder = Table.builder() .setDatabaseName(schemaName) .setTableName(tableName) .setOwner(Optional.of(tableOwner)) .setTableType(TableType.MANAGED_TABLE.name()) - .setParameters(ImmutableMap.of( - PRESTO_VERSION_NAME, TEST_SERVER_VERSION, - PRESTO_QUERY_ID_NAME, session.getQueryId())) + .setParameters(tableParamBuilder.build()) .setDataColumns(columns) .setPartitionColumns(partitionColumns); @@ -5241,6 +5264,19 @@ public void testPreferredInsertLayout() @Test public void testInsertBucketedTableLayout() throws Exception + { + insertBucketedTableLayout(false); + } + + @Test + public void testInsertBucketedTransactionalTableLayout() + throws Exception + { + insertBucketedTableLayout(true); + } + + protected void insertBucketedTableLayout(boolean transactional) + throws Exception { SchemaTableName tableName = temporaryTable("empty_bucketed_table"); try { @@ -5248,7 +5284,7 @@ public void testInsertBucketedTableLayout() new Column("column1", HIVE_STRING, Optional.empty()), new Column("column2", HIVE_LONG, Optional.empty())); HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of()); - createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty)); + createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty), transactional); try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); @@ -5277,6 +5313,19 @@ public void testInsertBucketedTableLayout() @Test public void testInsertPartitionedBucketedTableLayout() throws Exception + { + insertPartitionedBucketedTableLayout(false); + } + + @Test + public void testInsertPartitionedBucketedTransactionalTableLayout() + throws Exception + { + insertPartitionedBucketedTableLayout(true); + } + + protected void insertPartitionedBucketedTableLayout(boolean transactional) + throws Exception { SchemaTableName tableName = temporaryTable("empty_partitioned_table"); try { @@ -5285,7 +5334,7 @@ public void testInsertPartitionedBucketedTableLayout() new Column("column1", HIVE_STRING, Optional.empty()), partitioningColumn); HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of()); - createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty)); + createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty), transactional); try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java index 1c42f1b42c45..cd8d9e32ff30 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java @@ -72,6 +72,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestHiveTransactionalTable extends HiveProductTest @@ -1900,6 +1902,58 @@ public void testDeleteAfterMajorCompaction() }); } + @Test + public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThanOne() + { + unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(true); + } + + @Test + public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThanOne() + { + unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(false); + } + + private void unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(boolean isPartitioned) + { + withTemporaryTable(format("test_unbucketed%s_transactional_table_with_task_writer_count_greater_than_one", isPartitioned ? "_partitioned" : ""), true, isPartitioned, NONE, tableName -> { + onTrino().executeQuery(format( + "CREATE TABLE %s " + + "WITH (" + + "format='ORC', " + + "transactional=true " + + "%s" + + ") AS SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " + + "FROM tpch.sf1000.orders LIMIT 0", tableName, isPartitioned ? ", partitioned_by = ARRAY['orderpriority']" : "")); + onTrino().executeQuery("SET SESSION scale_writers = true"); + onTrino().executeQuery("SET SESSION writer_min_size = '4kB'"); + onTrino().executeQuery("SET SESSION task_writer_count = 4"); + onTrino().executeQuery("SET SESSION hive.target_max_file_size = '1MB'"); + + onTrino().executeQuery( + format( + "INSERT INTO %s SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " + + "FROM tpch.sf1000.orders LIMIT 100000", tableName)); + assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000)); + int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount(); + int expectedNumberOfPartitions = isPartitioned ? 5 : 1; + assertEquals(numberOfCreatedFiles, expectedNumberOfPartitions, format("There should be only %s files created", expectedNumberOfPartitions)); + + int sizeBeforeDeletion = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size(); + + onTrino().executeQuery(format("DELETE FROM %s WHERE (orderkey %% 2) = 0", tableName)); + assertThat(onTrino().executeQuery(format("SELECT COUNT (orderkey) FROM %s WHERE orderkey %% 2 = 0", tableName))).containsOnly(row(0)); + + int sizeOnTrinoWithWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size(); + int sizeOnHiveWithWhere = onHive().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size(); + int sizeOnTrinoWithoutWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size(); + + assertEquals(sizeOnHiveWithWhere, sizeOnTrinoWithWhere); + assertEquals(sizeOnTrinoWithWhere, sizeOnTrinoWithoutWhere); + assertTrue(sizeBeforeDeletion > sizeOnTrinoWithoutWhere); + }); + } + private void hdfsDeleteAll(String directory) { if (!hdfsClient.exist(directory)) {