From 19e595f56ed98aad4da67b8377d33ebb84c09bbd Mon Sep 17 00:00:00 2001 From: Arkadiusz Czajkowski Date: Wed, 2 Feb 2022 13:56:00 +0100 Subject: [PATCH] Add Iceberg tests with MinIO --- .../hive/containers/HiveMinioDataLake.java | 2 +- .../hive-core-site.xml | 4 + plugin/trino-iceberg/pom.xml | 12 ++ .../iceberg/BaseIcebergConnectorTest.java | 191 +++++++++------- .../BaseIcebergMinioConnectorTest.java | 203 ++++++++++++++++++ .../trino/plugin/iceberg/IcebergTestUtil.java | 62 ++++++ .../TestIcebergMinioOrcConnectorTest.java | 51 +++++ .../TestIcebergMinioParquetConnectorTest.java | 51 +++++ .../iceberg/TestIcebergOrcConnectorTest.java | 12 +- .../TestIcebergParquetConnectorTest.java | 25 +-- .../io/trino/testing/BaseConnectorTest.java | 23 +- 11 files changed, 533 insertions(+), 103 deletions(-) rename plugin/trino-hive/src/test/resources/{hive_s3_insert_overwrite => hive_minio_datalake}/hive-core-site.xml (82%) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtil.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorTest.java diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java index b58855bd7da1..28502f371d7f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java @@ -64,7 +64,7 @@ public HiveMinioDataLake(String bucketName, Map hiveHadoopFilesT this.hiveHadoop = closer.register( HiveHadoop.builder() .withFilesToMount(ImmutableMap.builder() - .put("hive_s3_insert_overwrite/hive-core-site.xml", "/etc/hadoop/conf/core-site.xml") + .put("hive_minio_datalake/hive-core-site.xml", "/etc/hadoop/conf/core-site.xml") .putAll(hiveHadoopFilesToMount) .buildOrThrow()) .withImage(hiveHadoopImage) diff --git a/plugin/trino-hive/src/test/resources/hive_s3_insert_overwrite/hive-core-site.xml b/plugin/trino-hive/src/test/resources/hive_minio_datalake/hive-core-site.xml similarity index 82% rename from plugin/trino-hive/src/test/resources/hive_s3_insert_overwrite/hive-core-site.xml rename to plugin/trino-hive/src/test/resources/hive_minio_datalake/hive-core-site.xml index 0679865ea4be..38083c633ed9 100644 --- a/plugin/trino-hive/src/test/resources/hive_s3_insert_overwrite/hive-core-site.xml +++ b/plugin/trino-hive/src/test/resources/hive_minio_datalake/hive-core-site.xml @@ -20,4 +20,8 @@ fs.s3a.path.style.access true + + fs.s3.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index c98357752bdb..fba2b08a02c7 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -16,6 +16,13 @@ ${project.parent.basedir} 0.13.1 + + classes @@ -84,6 +91,11 @@ units + + com.amazonaws + aws-java-sdk-s3 + + com.fasterxml.jackson.core jackson-core diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index c711a7b010d6..8cfed15a999d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -14,7 +14,6 @@ package io.trino.plugin.iceberg; import com.google.common.base.VerifyException; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; @@ -45,7 +44,7 @@ import io.trino.testng.services.Flaky; import io.trino.tpch.TpchTable; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -56,8 +55,12 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.BufferedInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; @@ -122,24 +125,47 @@ public abstract class BaseIcebergConnectorTest { private static final Pattern WITH_CLAUSE_EXTRACTOR = Pattern.compile(".*(WITH\\s*\\([^)]*\\))\\s*$", Pattern.DOTALL); - private final IcebergFileFormat format; + protected static final String TEST_REPARTITION_COMPLEX = "'bucket(custkey, 4)', 'truncate(comment, 1)'"; + protected static final String TEST_REPARTITION_SAME_COLUMN_MULTIPLE_TIMES = "'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)'"; + + protected final String schemaName; + protected final IcebergFileFormat format; protected BaseIcebergConnectorTest(IcebergFileFormat format) { + this.schemaName = "tpch" + format.name().toLowerCase(Locale.ENGLISH); this.format = requireNonNull(format, "format is null"); } @Override protected QueryRunner createQueryRunner() throws Exception + { + return createQueryRunner(Map.of()); + } + + protected QueryRunner createQueryRunner(Map connectorProperties) + throws Exception { return createIcebergQueryRunner( - Map.of(), - Map.of("iceberg.file-format", format.name()), - ImmutableList.>builder() - .addAll(REQUIRED_TPCH_TABLES) - .add(LINE_ITEM) - .build()); + ImmutableMap.of(), + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .putAll(connectorProperties) + .buildOrThrow(), + createSchemaInitializer(schemaName).build(), + Optional.empty()); + } + + protected SchemaInitializer.Builder createSchemaInitializer(String schemaName) + { + return SchemaInitializer.builder() + .withSchemaName(schemaName) + .withClonedTpchTables( + ImmutableSet.>builder() + .addAll(REQUIRED_TPCH_TABLES) + .add(LINE_ITEM) + .build()); } @Override @@ -233,11 +259,11 @@ public void testCharVarcharComparison() @Override public void testShowCreateSchema() { - assertThat(computeActual("SHOW CREATE SCHEMA tpch").getOnlyValue().toString()) - .matches("CREATE SCHEMA iceberg.tpch\n" + + assertThat(computeActual("SHOW CREATE SCHEMA " + schemaName).getOnlyValue().toString()) + .matches("CREATE SCHEMA iceberg." + schemaName + "\n" + "AUTHORIZATION USER user\n" + "WITH \\(\n" + - "\\s+location = '.*/iceberg_data/tpch'\n" + + "\\s+location = '.*/iceberg_data/" + schemaName + "'\n" + "\\)"); } @@ -266,7 +292,7 @@ public void testShowCreateTable() { File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) - .isEqualTo("CREATE TABLE iceberg.tpch.orders (\n" + + .isEqualTo("CREATE TABLE iceberg." + schemaName + ".orders (\n" + " orderkey bigint,\n" + " custkey bigint,\n" + " orderstatus varchar,\n" + @@ -279,7 +305,7 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " format = '" + format.name() + "',\n" + - " location = '" + tempDir + "/iceberg_data/tpch/orders'\n" + + " location = '" + tempDir + "/iceberg_data/" + schemaName + "/orders'\n" + ")"); } @@ -871,15 +897,24 @@ public void testCreatePartitionedTableWithNestedTypes() dropTable("test_partitioned_table_nested_type"); } + protected String createNewTableLocationBase(String schemaName) + { + return getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile().toURI().getPath() + "iceberg_data/" + schemaName; + } + + protected String createNewTableLocation(String schemaName) + { + return createNewTableLocationBase(schemaName) + "/tmp" + randomTableSuffix(); + } + @Test public void testCreatePartitionedTableAs() { - File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); - String tempDirPath = tempDir.toURI().toASCIIString() + randomTableSuffix(); + String location = createNewTableLocation(schemaName); assertUpdate( "CREATE TABLE test_create_partitioned_table_as " + "WITH (" + - "location = '" + tempDirPath + "', " + + "location = '" + location + "', " + "partitioning = ARRAY['ORDER_STATUS', 'Ship_Priority', 'Bucket(order_key,9)']" + ") " + "AS " + @@ -904,7 +939,7 @@ public void testCreatePartitionedTableAs() getSession().getSchema().orElseThrow(), "test_create_partitioned_table_as", format, - tempDirPath)); + location)); assertQuery("SELECT * from test_create_partitioned_table_as", "SELECT orderkey, shippriority, orderstatus FROM orders"); @@ -932,24 +967,23 @@ public void testColumnComments() @Test public void testTableComments() { - File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); - String tempDirPath = tempDir.toURI().toASCIIString() + randomTableSuffix(); + String location = createNewTableLocation(schemaName); String createTableTemplate = "" + - "CREATE TABLE iceberg.tpch.test_table_comments (\n" + + "CREATE TABLE iceberg." + schemaName + ".test_table_comments (\n" + " _x bigint\n" + ")\n" + "COMMENT '%s'\n" + "WITH (\n" + format(" format = '%s',\n", format) + - format(" location = '%s'\n", tempDirPath) + + format(" location = '%s'\n", location) + ")"; String createTableWithoutComment = "" + - "CREATE TABLE iceberg.tpch.test_table_comments (\n" + + "CREATE TABLE iceberg." + schemaName + ".test_table_comments (\n" + " _x bigint\n" + ")\n" + "WITH (\n" + " format = '" + format + "',\n" + - " location = '" + tempDirPath + "'\n" + + " location = '" + location + "'\n" + ")"; String createTableSql = format(createTableTemplate, "test table comment", format); assertUpdate(createTableSql); @@ -960,12 +994,12 @@ public void testTableComments() assertUpdate("COMMENT ON TABLE test_table_comments IS NULL"); assertEquals(computeScalar("SHOW CREATE TABLE test_table_comments"), createTableWithoutComment); - dropTable("iceberg.tpch.test_table_comments"); + dropTable("iceberg." + schemaName + ".test_table_comments"); assertUpdate(createTableWithoutComment); assertEquals(computeScalar("SHOW CREATE TABLE test_table_comments"), createTableWithoutComment); - dropTable("iceberg.tpch.test_table_comments"); + dropTable("iceberg." + schemaName + ".test_table_comments"); } @Test @@ -980,10 +1014,10 @@ public void testRollbackSnapshot() assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))"); - assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId)); + assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", schemaName, afterFirstInsertId)); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))"); - assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterCreateTableId)); + assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", schemaName, afterCreateTableId)); assertEquals((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue(), 0); assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1); @@ -992,7 +1026,7 @@ public void testRollbackSnapshot() // extra insert which should be dropped on rollback assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1); - assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterSecondInsertId)); + assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", schemaName, afterSecondInsertId)); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))"); dropTable("test_rollback"); @@ -1088,21 +1122,18 @@ public void testLargeInOnPartitionedColumns() @Test public void testCreateTableLike() { - IcebergFileFormat otherFormat = (format == PARQUET) ? ORC : PARQUET; - testCreateTableLikeForFormat(otherFormat); - } - - private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) - { - File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); - String tempDirPath = tempDir.toURI().toASCIIString() + randomTableSuffix(); + String locationBase = createNewTableLocationBase(schemaName); + String location = createNewTableLocation(schemaName); - assertUpdate(format("CREATE TABLE test_create_table_like_original (col1 INTEGER, aDate DATE) WITH(format = '%s', location = '%s', partitioning = ARRAY['aDate'])", format, tempDirPath)); - assertEquals(getTablePropertiesString("test_create_table_like_original"), "WITH (\n" + - format(" format = '%s',\n", format) + - format(" location = '%s',\n", tempDirPath) + - " partitioning = ARRAY['adate']\n" + - ")"); + IcebergFileFormat otherFormat = (format == PARQUET) ? ORC : PARQUET; + assertUpdate(format("CREATE TABLE test_create_table_like_original (col1 INTEGER, aDate DATE) WITH(format = '%s', location = '%s', partitioning = ARRAY['aDate'])", format, location)); + assertEquals( + getTablePropertiesString("test_create_table_like_original"), + "WITH (\n" + + format(" format = '%s',%n", format) + + format(" location = '%s',%n", location) + + " partitioning = ARRAY['adate']\n" + + ")"); assertUpdate("CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)"); assertUpdate("INSERT INTO test_create_table_like_copy0 (col1, aDate, col2) VALUES (1, CAST('1950-06-28' AS DATE), 3)", 1); @@ -1111,18 +1142,18 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) assertUpdate("CREATE TABLE test_create_table_like_copy1 (LIKE test_create_table_like_original)"); assertEquals(getTablePropertiesString("test_create_table_like_copy1"), "WITH (\n" + - format(" format = '%s',\n location = '%s'\n)", format, tempDir + "/iceberg_data/tpch/test_create_table_like_copy1")); + format(" format = '%s',\n location = '%s'\n)", format, locationBase + "/test_create_table_like_copy1")); dropTable("test_create_table_like_copy1"); assertUpdate("CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)"); assertEquals(getTablePropertiesString("test_create_table_like_copy2"), "WITH (\n" + - format(" format = '%s',\n location = '%s'\n)", format, tempDir + "/iceberg_data/tpch/test_create_table_like_copy2")); + format(" format = '%s',\n location = '%s'\n)", format, locationBase + "/test_create_table_like_copy2")); dropTable("test_create_table_like_copy2"); assertUpdate("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)"); assertEquals(getTablePropertiesString("test_create_table_like_copy3"), "WITH (\n" + format(" format = '%s',\n", format) + - format(" location = '%s',\n", tempDirPath) + + format(" location = '%s',\n", location) + " partitioning = ARRAY['adate']\n" + ")"); dropTable("test_create_table_like_copy3"); @@ -1130,7 +1161,7 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) assertUpdate(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat)); assertEquals(getTablePropertiesString("test_create_table_like_copy4"), "WITH (\n" + format(" format = '%s',\n", otherFormat) + - format(" location = '%s',\n", tempDirPath) + + format(" location = '%s',\n", location) + " partitioning = ARRAY['adate']\n" + ")"); dropTable("test_create_table_like_copy4"); @@ -1960,13 +1991,13 @@ public void testMultipleColumnTableStatistics() @Test public void testPartitionedTableStatistics() { - assertUpdate("CREATE TABLE iceberg.tpch.test_partitioned_table_statistics (col1 REAL, col2 BIGINT) WITH (partitioning = ARRAY['col2'])"); + assertUpdate("CREATE TABLE iceberg." + schemaName + ".test_partitioned_table_statistics (col1 REAL, col2 BIGINT) WITH (partitioning = ARRAY['col2'])"); String insertStart = "INSERT INTO test_partitioned_table_statistics"; assertUpdate(insertStart + " VALUES (-10, -1)", 1); assertUpdate(insertStart + " VALUES (100, 10)", 1); - MaterializedResult result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); + MaterializedResult result = computeActual("SHOW STATS FOR iceberg." + schemaName + ".test_partitioned_table_statistics"); assertEquals(result.getRowCount(), 3); MaterializedRow row0 = result.getMaterializedRows().get(0); @@ -1992,7 +2023,7 @@ public void testPartitionedTableStatistics() .mapToObj(i -> "(NULL, 10)") .collect(joining(", ")), 5); - result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); + result = computeActual("SHOW STATS FOR iceberg." + schemaName + ".test_partitioned_table_statistics"); assertEquals(result.getRowCount(), 3); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); @@ -2013,7 +2044,7 @@ public void testPartitionedTableStatistics() .mapToObj(i -> "(100, NULL)") .collect(joining(", ")), 5); - result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); + result = computeActual("SHOW STATS FOR iceberg." + schemaName + ".test_partitioned_table_statistics"); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); assertEquals(row0.getField(3), 5.0 / 17.0); @@ -2029,17 +2060,16 @@ public void testPartitionedTableStatistics() row2 = result.getMaterializedRows().get(2); assertEquals(row2.getField(4), 17.0); - dropTable("iceberg.tpch.test_partitioned_table_statistics"); + dropTable("iceberg." + schemaName + ".test_partitioned_table_statistics"); } @Test public void testStatisticsConstraints() { - String tableName = "iceberg.tpch.test_simple_partitioned_table_statistics"; - assertUpdate("CREATE TABLE iceberg.tpch.test_simple_partitioned_table_statistics (col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])"); + String tableName = "iceberg." + schemaName + ".test_simple_partitioned_table_statistics"; + assertUpdate("CREATE TABLE " + tableName + " (col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])"); - String insertStart = "INSERT INTO iceberg.tpch.test_simple_partitioned_table_statistics"; - assertUpdate(insertStart + " VALUES (1, 101), (2, 102), (3, 103), (4, 104)", 4); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 101), (2, 102), (3, 103), (4, 104)", 4); TableStatistics tableStatistics = getTableStatistics(tableName, new Constraint(TupleDomain.all())); IcebergColumnHandle col1Handle = getColumnHandleFromStatistics(tableStatistics, "col1"); IcebergColumnHandle col2Handle = getColumnHandleFromStatistics(tableStatistics, "col2"); @@ -2071,7 +2101,7 @@ public void testStatisticsConstraints() @Test public void testPredicatePushdown() { - QualifiedObjectName tableName = new QualifiedObjectName("iceberg", "tpch", "test_predicate"); + QualifiedObjectName tableName = new QualifiedObjectName("iceberg", schemaName, "test_predicate"); assertUpdate(format("CREATE TABLE %s (col1 BIGINT, col2 BIGINT, col3 BIGINT) WITH (partitioning = ARRAY['col2', 'col3'])", tableName)); assertUpdate(format("INSERT INTO %s VALUES (1, 10, 100)", tableName), 1L); assertUpdate(format("INSERT INTO %s VALUES (2, 20, 200)", tableName), 1L); @@ -2464,12 +2494,18 @@ public void testFileSizeInManifest() Long fileSizeInBytes = (Long) row.getField(2); totalRecordCount += recordCount; - assertThat(fileSizeInBytes).isEqualTo(Files.size(Paths.get(path))); + assertThat(fileSizeInBytes).isEqualTo(calculateFileSystemFileSize(path)); } // Verify sum(record_count) to make sure we have all the files. assertThat(totalRecordCount).isEqualTo(2); } + protected Long calculateFileSystemFileSize(String filePath) + throws IOException + { + return Files.size(Paths.get(filePath)); + } + @Test public void testIncorrectIcebergFileSizes() throws Exception @@ -2486,7 +2522,8 @@ public void testIncorrectIcebergFileSizes() // Read manifest file Schema schema; GenericData.Record entry = null; - try (DataFileReader dataFileReader = new DataFileReader<>(new File(manifestFile), new GenericDatumReader<>())) { + try (DataFileStream dataFileReader = + new DataFileStream<>(createDataFileStreamForFileSystem(manifestFile), new GenericDatumReader<>())) { schema = dataFileReader.getSchema(); int recordCount = 0; while (dataFileReader.hasNext()) { @@ -2529,6 +2566,12 @@ public void testIncorrectIcebergFileSizes() dropTable("test_iceberg_file_size"); } + protected InputStream createDataFileStreamForFileSystem(String filePath) + throws FileNotFoundException + { + return new BufferedInputStream(new FileInputStream(filePath)); + } + @Test public void testSplitPruningForFilterOnPartitionColumn() { @@ -2753,7 +2796,7 @@ public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin() "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = " + totalPrice); OperatorStats probeStats = searchScanFilterAndProjectOperatorStats( result.getQueryId(), - new QualifiedObjectName(ICEBERG_CATALOG, "tpch", "lineitem")); + new QualifiedObjectName(ICEBERG_CATALOG, schemaName, "lineitem")); // Assert some lineitem rows were filtered out on file level assertThat(probeStats.getInputPositions()).isLessThan(fullTableScan); @@ -2762,13 +2805,13 @@ public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin() @Test(dataProvider = "repartitioningDataProvider") public void testRepartitionDataOnCtas(Session session, String partitioning, int expectedFiles) { - testRepartitionData(session, "tpch.tiny.orders", true, partitioning, expectedFiles); + testRepartitionData(session, "SELECT * FROM tpch.tiny.orders", true, partitioning, expectedFiles); } @Test(dataProvider = "repartitioningDataProvider") public void testRepartitionDataOnInsert(Session session, String partitioning, int expectedFiles) { - testRepartitionData(session, "tpch.tiny.orders", false, partitioning, expectedFiles); + testRepartitionData(session, "SELECT * FROM tpch.tiny.orders", false, partitioning, expectedFiles); } @DataProvider @@ -2790,9 +2833,9 @@ public Object[][] repartitioningDataProvider() // varchar-based {defaultSession, "'truncate(comment, 1)'", 35}, // complex; would exceed 100 open writers limit in IcebergPageSink without write repartitioning - {defaultSession, "'bucket(custkey, 4)', 'truncate(comment, 1)'", 131}, + {defaultSession, TEST_REPARTITION_COMPLEX, 131}, // same column multiple times - {defaultSession, "'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)'", 180}, + {defaultSession, TEST_REPARTITION_SAME_COLUMN_MULTIPLE_TIMES, 180}, }; } @@ -2817,7 +2860,7 @@ private void testStatsBasedRepartitionData(boolean ctas) .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "5") .build(); // Use DISTINCT to add data redistribution between source table and the writer. This makes it more likely that all writers get some data. - String sourceRelation = "(SELECT DISTINCT orderkey, custkey, orderstatus FROM tpch.tiny.orders)"; + String sourceRelation = "SELECT DISTINCT orderkey, custkey, orderstatus FROM tpch.tiny.orders"; testRepartitionData( sessionRepartitionSmall, sourceRelation, @@ -2836,37 +2879,37 @@ private void testStatsBasedRepartitionData(boolean ctas) }); } - private void testRepartitionData(Session session, String sourceRelation, boolean ctas, String partitioning, int expectedFiles) + protected void testRepartitionData(Session session, String sourceQuery, boolean ctas, String partitioning, int expectedFiles) { String tableName = "repartition" + - "_" + sourceRelation.replaceAll("[^a-zA-Z0-9]", "") + + "_" + sourceQuery.replaceAll("[^a-zA-Z0-9]", "") + (ctas ? "ctas" : "insert") + "_" + partitioning.replaceAll("[^a-zA-Z0-9]", "") + "_" + randomTableSuffix(); - long rowCount = (long) computeScalar(session, "SELECT count(*) FROM " + sourceRelation); + long rowCount = computeActual(sourceQuery).getRowCount(); if (ctas) { assertUpdate( session, "CREATE TABLE " + tableName + " WITH (partitioning = ARRAY[" + partitioning + "]) " + - "AS SELECT * FROM " + sourceRelation, + "AS " + sourceQuery, rowCount); } else { assertUpdate( session, "CREATE TABLE " + tableName + " WITH (partitioning = ARRAY[" + partitioning + "]) " + - "AS SELECT * FROM " + sourceRelation + " WITH NO DATA", + "AS " + sourceQuery + " WITH NO DATA", 0); // Use source table big enough so that there will be multiple pages being written. - assertUpdate(session, "INSERT INTO " + tableName + " SELECT * FROM " + sourceRelation, rowCount); + assertUpdate(session, "INSERT INTO " + tableName + " " + sourceQuery, rowCount); } // verify written data assertThat(query(session, "TABLE " + tableName)) .skippingTypesCheck() - .matches("SELECT * FROM " + sourceRelation); + .matches(sourceQuery); // verify data files, i.e. repartitioning took place assertThat(query(session, "SELECT count(*) FROM \"" + tableName + "$files\"")) @@ -3308,7 +3351,7 @@ private List getActiveFiles(String tableName) .collect(toImmutableList()); } - private List getAllDataFilesFromTableDirectory(String tableName) + protected List getAllDataFilesFromTableDirectory(String tableName) throws IOException { String schema = getSession().getSchema().orElseThrow(); @@ -3327,7 +3370,7 @@ public void testOptimizeParameterValidation() { assertQueryFails( "ALTER TABLE no_such_table_exists EXECUTE OPTIMIZE", - "\\Qline 1:1: Table 'iceberg.tpch.no_such_table_exists' does not exist"); + "\\Qline 1:1: Table 'iceberg." + schemaName + ".no_such_table_exists' does not exist"); assertQueryFails( "ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33')", "\\QUnable to set catalog 'iceberg' table procedure 'OPTIMIZE' property 'file_size_threshold' to ['33']: size is not a valid data size string: 33"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorTest.java new file mode 100644 index 000000000000..7a3aebe31006 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorTest.java @@ -0,0 +1,203 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.testing.QueryRunner; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.BufferedInputStream; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.containers.HiveMinioDataLake.ACCESS_KEY; +import static io.trino.plugin.hive.containers.HiveMinioDataLake.SECRET_KEY; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class BaseIcebergMinioConnectorTest + extends BaseIcebergConnectorTest +{ + private static final List TEST_REPARTITION_EXCLUSIONS = ImmutableList.of( + TEST_REPARTITION_COMPLEX, + TEST_REPARTITION_SAME_COLUMN_MULTIPLE_TIMES); + + private final String bucketName; + private HiveMinioDataLake hiveMinioDataLake; + + public BaseIcebergMinioConnectorTest(IcebergFileFormat format) + { + super(format); + this.bucketName = "test-iceberg-minio-integration-test-" + randomTableSuffix(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + this.hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName, ImmutableMap.of())); + this.hiveMinioDataLake.start(); + return createQueryRunner( + ImmutableMap.builder() + .put("iceberg.catalog.type", "HIVE_METASTORE") + .put("hive.metastore.uri", "thrift://" + hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint()) + .put("hive.s3.aws-access-key", ACCESS_KEY) + .put("hive.s3.aws-secret-key", SECRET_KEY) + .put("hive.s3.endpoint", "http://" + hiveMinioDataLake.getMinio().getMinioApiEndpoint()) + .put("hive.s3.path-style-access", "true") + .put("hive.s3.streaming.part-size", "5MB") + .buildOrThrow()); + } + + @Override + protected SchemaInitializer.Builder createSchemaInitializer(String schemaName) + { + return super.createSchemaInitializer(schemaName) + .withSchemaProperties(ImmutableMap.of("location", "'s3://" + bucketName + "/" + schemaName + "'")); + } + + @Override + protected String createSchemaSql(String schemaName) + { + return "CREATE SCHEMA IF NOT EXISTS " + schemaName + " WITH (location = 's3://" + bucketName + "/" + schemaName + "')"; + } + + @Override + protected String createNewTableLocationBase(String schemaName) + { + return "s3://" + bucketName + "/" + schemaName; + } + + @Override + protected Long calculateFileSystemFileSize(String filePath) + { + return this.hiveMinioDataLake.getS3Client() + .getObject(bucketName, filePath.replace("s3://" + bucketName + "/", "")) + .getObjectMetadata() + .getContentLength(); + } + + @Override + protected BufferedInputStream createDataFileStreamForFileSystem(String filePath) + { + return new BufferedInputStream(this.hiveMinioDataLake.getS3Client() + .getObject(bucketName, filePath.replace("s3://" + bucketName + "/", "")).getObjectContent()); + } + + @Override + protected List getAllDataFilesFromTableDirectory(String tableName) + { + String tableLocation = getSession().getSchema().orElseThrow() + "/" + tableName + "/data"; + return this.hiveMinioDataLake.getS3Client().listObjects(bucketName, tableLocation) + .getObjectSummaries() + .stream() + .map(object -> "s3://" + bucketName + "/" + object.getKey()) + .filter(object -> !object.endsWith(".crc")) + .collect(toImmutableList()); + } + + /** + * DataProvider overload to exclude 2 test cases from generic tests: + * - {@link BaseIcebergConnectorTest#testRepartitionDataOnCtas(Session, String, int)} + * - {@link BaseIcebergConnectorTest#testRepartitionDataOnInsert(Session, String, int)} + *

+ * 1. partitioning -> 'bucket(custkey, 4)', 'truncate(comment, 1)' + * Test environment provides higher memory usage reaching over 1,5GB per test case. + * This leads to test flakiness when both test classes run in parallel and may consume + * over 3GB which is current hard limit for CI and may end up with OOM. + *

+ * Limiting dataset to 300 rows in dedicated test methods for MinIO solves this issue: + * - {@link BaseIcebergMinioConnectorTest#testRepartitionOnMinio()} ()} + *

+ * 2. partitioning -> 'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)' + * Test environment causes HMS to consume more time during createTable operation. + * This leads to read timed out and retry operation ended with TableAlreadyExists + * (HMS managed to end operation on it's side). As such behaviour wasn't observed + * in normal circumstances, test was adopted to avoid it. + *

+ * Limiting dataset to 300 rows in dedicated test methods for MinIO solves this issue: + * - {@link BaseIcebergMinioConnectorTest#testRepartitionOnMinio()} ()} + */ + @Override + @DataProvider + public Object[][] repartitioningDataProvider() + { + Object[][] defaultTestsData = super.repartitioningDataProvider(); + List minioTestData = new ArrayList<>(); + for (Object[] testData : defaultTestsData) { + if (!TEST_REPARTITION_EXCLUSIONS.contains(testData[1])) { + minioTestData.add(testData); + } + } + return minioTestData.toArray(new Object[minioTestData.size()][]); + } + + @Test + public void testRepartitionOnMinio() + { + String sourceQuery = "SELECT * FROM tpch.tiny.orders ORDER BY orderkey LIMIT 300"; + // complex; would exceed 100 open writers limit in IcebergPageSink without write repartitioning + testRepartitionData(getSession(), sourceQuery, true, TEST_REPARTITION_COMPLEX, 84); + testRepartitionData(getSession(), sourceQuery, false, TEST_REPARTITION_COMPLEX, 84); + // with same column multiple times + testRepartitionData(getSession(), sourceQuery, true, TEST_REPARTITION_SAME_COLUMN_MULTIPLE_TIMES, 97); + testRepartitionData(getSession(), sourceQuery, false, TEST_REPARTITION_SAME_COLUMN_MULTIPLE_TIMES, 97); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")) + .isEqualTo("" + + "CREATE TABLE iceberg." + schemaName + ".region (\n" + + " regionkey bigint,\n" + + " name varchar,\n" + + " comment varchar\n" + + ")\n" + + "WITH (\n" + + " format = '" + format + "',\n" + + " location = 's3://" + bucketName + "/" + schemaName + "/region'\n" + + ")"); + } + + @Test + @Override + public void testRenameSchema() + { + // Overridden because error message is different from upper test method + String schemaName = getSession().getSchema().orElseThrow(); + assertQueryFails( + format("ALTER SCHEMA %s RENAME TO %s", schemaName, schemaName + randomTableSuffix()), + "Hive metastore does not support renaming schemas"); + } + + @Test + @Override + public void testShowCreateSchema() + { + assertThat(computeActual("SHOW CREATE SCHEMA " + schemaName).getOnlyValue().toString()) + .matches("CREATE SCHEMA iceberg." + schemaName + "\n" + + "AUTHORIZATION USER user\n" + + "WITH \\(\n" + + "\\s+location = 's3://" + bucketName + "/" + schemaName + "'\n" + + "\\)"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtil.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtil.java new file mode 100644 index 000000000000..181c823eaf82 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtil.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.Session; + +public final class IcebergTestUtil +{ + private IcebergTestUtil() + { + } + + public static boolean orcSupportsIcebergFileStatistics(String typeName) + { + return !(typeName.equalsIgnoreCase("varbinary")) && + !(typeName.equalsIgnoreCase("uuid")); + } + + public static boolean orcSupportsRowGroupStatistics(String typeName) + { + return !typeName.equalsIgnoreCase("varbinary"); + } + + public static Session orcWithSmallRowGroups(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "10") + .build(); + } + + public static boolean parquetSupportsIcebergFileStatistics(String typeName) + { + return true; + } + + public static boolean parquetSupportsRowGroupStatistics(String typeName) + { + return !(typeName.equalsIgnoreCase("varbinary") || + typeName.equalsIgnoreCase("time(6)") || + typeName.equalsIgnoreCase("timestamp(6) with time zone")); + } + + public static Session parquetWithSmallRowGroups(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty("iceberg", "parquet_writer_page_size", "100B") + .setCatalogSessionProperty("iceberg", "parquet_writer_block_size", "100B") + .setCatalogSessionProperty("iceberg", "parquet_writer_batch_size", "10") + .build(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java new file mode 100644 index 000000000000..28ba804d86a1 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.Session; +import org.testng.annotations.Test; + +import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; +import static io.trino.plugin.iceberg.IcebergTestUtil.orcSupportsIcebergFileStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.orcSupportsRowGroupStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.orcWithSmallRowGroups; + +// Due to fact some tests running on MinIO are memory consuming, we want to prevent running them in parallel. +@Test(singleThreaded = true) +public class TestIcebergMinioOrcConnectorTest + extends BaseIcebergMinioConnectorTest +{ + public TestIcebergMinioOrcConnectorTest() + { + super(ORC); + } + + @Override + protected boolean supportsIcebergFileStatistics(String typeName) + { + return orcSupportsIcebergFileStatistics(typeName); + } + + @Override + protected boolean supportsRowGroupStatistics(String typeName) + { + return orcSupportsRowGroupStatistics(typeName); + } + + @Override + protected Session withSmallRowGroups(Session session) + { + return orcWithSmallRowGroups(session); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorTest.java new file mode 100644 index 000000000000..e208d0cf7c5d --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.Session; +import org.testng.annotations.Test; + +import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; +import static io.trino.plugin.iceberg.IcebergTestUtil.parquetSupportsIcebergFileStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.parquetSupportsRowGroupStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.parquetWithSmallRowGroups; + +// Due to fact some tests running on MinIO are memory consuming, we want to prevent running them in parallel. +@Test(singleThreaded = true) +public class TestIcebergMinioParquetConnectorTest + extends BaseIcebergMinioConnectorTest +{ + public TestIcebergMinioParquetConnectorTest() + { + super(PARQUET); + } + + @Override + protected boolean supportsIcebergFileStatistics(String typeName) + { + return parquetSupportsIcebergFileStatistics(typeName); + } + + @Override + protected boolean supportsRowGroupStatistics(String typeName) + { + return parquetSupportsRowGroupStatistics(typeName); + } + + @Override + protected Session withSmallRowGroups(Session session) + { + return parquetWithSmallRowGroups(session); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcConnectorTest.java index e942ad933d6d..612ab9db884f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcConnectorTest.java @@ -16,6 +16,9 @@ import io.trino.Session; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; +import static io.trino.plugin.iceberg.IcebergTestUtil.orcSupportsIcebergFileStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.orcSupportsRowGroupStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.orcWithSmallRowGroups; public class TestIcebergOrcConnectorTest extends BaseIcebergConnectorTest @@ -28,21 +31,18 @@ public TestIcebergOrcConnectorTest() @Override protected boolean supportsIcebergFileStatistics(String typeName) { - return !(typeName.equalsIgnoreCase("varbinary")) && - !(typeName.equalsIgnoreCase("uuid")); + return orcSupportsIcebergFileStatistics(typeName); } @Override protected boolean supportsRowGroupStatistics(String typeName) { - return !typeName.equalsIgnoreCase("varbinary"); + return orcSupportsRowGroupStatistics(typeName); } @Override protected Session withSmallRowGroups(Session session) { - return Session.builder(session) - .setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "10") - .build(); + return orcWithSmallRowGroups(session); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java index 2f888c0e4e8e..50986230ae9d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java @@ -22,6 +22,9 @@ import java.util.stream.IntStream; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; +import static io.trino.plugin.iceberg.IcebergTestUtil.parquetSupportsIcebergFileStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.parquetSupportsRowGroupStatistics; +import static io.trino.plugin.iceberg.IcebergTestUtil.parquetWithSmallRowGroups; import static org.testng.Assert.assertEquals; public class TestIcebergParquetConnectorTest @@ -35,15 +38,19 @@ public TestIcebergParquetConnectorTest() @Override protected boolean supportsIcebergFileStatistics(String typeName) { - return true; + return parquetSupportsIcebergFileStatistics(typeName); } @Override protected boolean supportsRowGroupStatistics(String typeName) { - return !(typeName.equalsIgnoreCase("varbinary") || - typeName.equalsIgnoreCase("time(6)") || - typeName.equalsIgnoreCase("timestamp(6) with time zone")); + return parquetSupportsRowGroupStatistics(typeName); + } + + @Override + protected Session withSmallRowGroups(Session session) + { + return parquetWithSmallRowGroups(session); } @Test @@ -63,14 +70,4 @@ public void testRowGroupResetDictionary() assertEquals(result.getRowCount(), 100); } } - - @Override - protected Session withSmallRowGroups(Session session) - { - return Session.builder(session) - .setCatalogSessionProperty("iceberg", "parquet_writer_page_size", "100B") - .setCatalogSessionProperty("iceberg", "parquet_writer_block_size", "100B") - .setCatalogSessionProperty("iceberg", "parquet_writer_batch_size", "10") - .build(); - } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index c35eb3ab1ebc..03d8ebcc9679 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -117,6 +117,11 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) return connectorBehavior.hasBehaviorByDefault(this::hasBehavior); } + protected String createSchemaSql(String schemaName) + { + return "CREATE SCHEMA IF NOT EXISTS " + schemaName; + } + @Test @Override public void ensureTestNamingConvention() @@ -149,12 +154,14 @@ public void testShowCreateSchema() public void testCreateSchema() { String schemaName = "test_schema_create_" + randomTableSuffix(); + String creteSchemaSql = createSchemaSql(schemaName); + if (!hasBehavior(SUPPORTS_CREATE_SCHEMA)) { - assertQueryFails("CREATE SCHEMA " + schemaName, "This connector does not support creating schemas"); + assertQueryFails(creteSchemaSql, "This connector does not support creating schemas"); return; } assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).doesNotContain(schemaName); - assertUpdate("CREATE SCHEMA " + schemaName); + assertUpdate(creteSchemaSql); // verify listing of new schema assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(schemaName); @@ -183,7 +190,7 @@ public void testDropNonEmptySchema() } try { - assertUpdate("CREATE SCHEMA " + schemaName); + assertUpdate(createSchemaSql(schemaName)); assertUpdate("CREATE TABLE " + schemaName + ".t(x int)"); assertQueryFails("DROP SCHEMA " + schemaName, ".*Cannot drop non-empty schema '\\Q" + schemaName + "\\E'"); } @@ -1072,7 +1079,7 @@ public void testRenameMaterializedView() assertTestingMaterializedViewQuery(schema, uppercaseName.toLowerCase(ENGLISH)); // Ensure select allows for lower-case, not delimited identifier String otherSchema = "rename_mv_other_schema"; - assertUpdate(format("CREATE SCHEMA IF NOT EXISTS %s", otherSchema)); + assertUpdate(createSchemaSql(otherSchema)); if (hasBehavior(SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS)) { assertUpdate(session, "ALTER MATERIALIZED VIEW " + uppercaseName + " RENAME TO " + otherSchema + "." + originalMaterializedView.getObjectName()); assertTestingMaterializedViewQuery(otherSchema, originalMaterializedView.getObjectName()); @@ -1106,7 +1113,7 @@ private void assertTestingMaterializedViewQuery(String schema, String materializ private void createTestingMaterializedView(QualifiedObjectName view, Optional comment) { - assertUpdate(format("CREATE SCHEMA IF NOT EXISTS %s", view.getSchemaName())); + assertUpdate(createSchemaSql(view.getSchemaName())); assertUpdate(format( "CREATE MATERIALIZED VIEW %s %s AS SELECT * FROM nation", view, @@ -1400,7 +1407,7 @@ public void testWriteNotAllowedInTransaction() { skipTestUnless(!hasBehavior(SUPPORTS_MULTI_STATEMENT_WRITES)); - assertWriteNotAllowedInTransaction(SUPPORTS_CREATE_SCHEMA, "CREATE SCHEMA write_not_allowed"); + assertWriteNotAllowedInTransaction(SUPPORTS_CREATE_SCHEMA, createSchemaSql("write_not_allowed")); assertWriteNotAllowedInTransaction(SUPPORTS_CREATE_TABLE, "CREATE TABLE write_not_allowed (x int)"); assertWriteNotAllowedInTransaction(SUPPORTS_CREATE_TABLE, "DROP TABLE region"); assertWriteNotAllowedInTransaction(SUPPORTS_CREATE_TABLE_WITH_DATA, "CREATE TABLE write_not_allowed AS SELECT * FROM region"); @@ -1438,7 +1445,7 @@ public void testRenameSchema() String schemaName = "test_rename_schema_" + randomTableSuffix(); try { - assertUpdate("CREATE SCHEMA " + schemaName); + assertUpdate(createSchemaSql(schemaName)); assertUpdate("ALTER SCHEMA " + schemaName + " RENAME TO " + schemaName + "_renamed"); } finally { @@ -1470,7 +1477,7 @@ public void testRenameTableAcrossSchema() assertUpdate("CREATE TABLE " + tableName + " AS SELECT 123 x", 1); String schemaName = "test_schema_" + randomTableSuffix(); - assertUpdate("CREATE SCHEMA " + schemaName); + assertUpdate(createSchemaSql(schemaName)); String renamedTable = schemaName + ".test_rename_new_" + randomTableSuffix(); assertUpdate("ALTER TABLE " + tableName + " RENAME TO " + renamedTable);