Skip to content

Commit

Permalink
Enable listing of Iceberg tables created by Spark
Browse files Browse the repository at this point in the history
Engines like Spark and Flink set table_type property to uppercase
value 'ICEBERG', whereas Trino sets in lowercase 'iceberg'. This
commit changes Trino's listing behavior for Iceberg tables to show
tables that have been configured with any of the two values for
the property.

Co-authored-by: Xingyuan Lin <[email protected]>
  • Loading branch information
phd3 and lxynov committed Apr 27, 2021
1 parent 28551f2 commit a2ea77b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
Expand All @@ -103,6 +104,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -152,7 +154,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
Expand Down Expand Up @@ -371,11 +372,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
List<SchemaTableName> tablesList = schemaName.map(Collections::singletonList)
.orElseGet(metastore::getAllDatabases)
.stream()
.flatMap(schema -> metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE).stream()
.map(table -> new SchemaTableName(schema, table))
.collect(toList())
.stream())
.collect(toList());
.flatMap(schema -> Stream.concat(
// Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because
// Trino uses lowercase value whereas Spark and Flink use uppercase.
// TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)),
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)))
.distinct()) // distinct() to avoid duplicates for case-insensitive HMS backends
.collect(toImmutableList());

schemaName.map(Collections::singletonList)
.orElseGet(metastore::getAllDatabases).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class TestSparkCompatibility
// see spark-defaults.conf
private static final String SPARK_CATALOG = "iceberg_test";
private static final String PRESTO_CATALOG = "iceberg";
private static final String TEST_SCHEMA_NAME = "default";

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoReadingSparkData()
Expand Down Expand Up @@ -406,14 +407,30 @@ public void testIdBasedFieldMapping()
assertEquals(result.column(1).get(0), expected.getValues().get(0));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testTrinoShowingSparkCreatedTables()
{
String sparkTable = "test_table_listing_for_spark";
String trinoTable = "test_table_listing_for_trino";

onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG", sparkTableName(sparkTable)));
onTrino().executeQuery(format("CREATE TABLE %s (_integer INTEGER )", prestoTableName(trinoTable)));

assertThat(onTrino().executeQuery(format("SHOW TABLES FROM %s LIKE '%s'", TEST_SCHEMA_NAME, "test_table_listing_for_%")))
.containsOnly(row(sparkTable), row(trinoTable));

onSpark().executeQuery("DROP TABLE " + sparkTableName(sparkTable));
onTrino().executeQuery("DROP TABLE " + prestoTableName(trinoTable));
}

private static String sparkTableName(String tableName)
{
return format("%s.default.%s", SPARK_CATALOG, tableName);
return format("%s.%s.%s", SPARK_CATALOG, TEST_SCHEMA_NAME, tableName);
}

private static String prestoTableName(String tableName)
{
return format("%s.default.%s", PRESTO_CATALOG, tableName);
return format("%s.%s.%s", PRESTO_CATALOG, TEST_SCHEMA_NAME, tableName);
}

private io.trino.jdbc.Row.Builder rowBuilder()
Expand Down

0 comments on commit a2ea77b

Please sign in to comment.