Skip to content

Commit

Permalink
Separate schemas in BaseSharedMetastoreTest
Browse files Browse the repository at this point in the history
Some tests expect only region and nation
tables exist in the schema.
  • Loading branch information
ebyhr committed May 14, 2024
1 parent e029973 commit 212455d
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
public abstract class BaseSharedMetastoreTest
extends AbstractTestQueryFramework
{
protected final String schema = "test_shared_schema_" + randomNameSuffix();
/**
* This schema should contain only nation and region tables. Use {@link #testSchema} when creating new tables.
*/
protected final String tpchSchema = "test_tpch_shared_schema_" + randomNameSuffix();
protected final String testSchema = "test_mutable_shared_schema_" + randomNameSuffix();

protected abstract String getExpectedHiveCreateSchema(String catalogName);

Expand All @@ -37,47 +41,47 @@ public abstract class BaseSharedMetastoreTest
@Test
public void testSelect()
{
assertQuery("SELECT * FROM iceberg." + schema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM hive." + schema + ".region", "SELECT * FROM region");
assertQuery("SELECT * FROM hive_with_redirections." + schema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM hive_with_redirections." + schema + ".region", "SELECT * FROM region");
assertQuery("SELECT * FROM iceberg_with_redirections." + schema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM iceberg_with_redirections." + schema + ".region", "SELECT * FROM region");

assertThat(query("SELECT * FROM iceberg." + schema + ".region"))
assertQuery("SELECT * FROM iceberg." + tpchSchema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM hive." + tpchSchema + ".region", "SELECT * FROM region");
assertQuery("SELECT * FROM hive_with_redirections." + tpchSchema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM hive_with_redirections." + tpchSchema + ".region", "SELECT * FROM region");
assertQuery("SELECT * FROM iceberg_with_redirections." + tpchSchema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM iceberg_with_redirections." + tpchSchema + ".region", "SELECT * FROM region");

assertThat(query("SELECT * FROM iceberg." + tpchSchema + ".region"))
.failure().hasMessageContaining("Not an Iceberg table");
assertThat(query("SELECT * FROM hive." + schema + ".nation"))
assertThat(query("SELECT * FROM hive." + tpchSchema + ".nation"))
.failure().hasMessageContaining("Cannot query Iceberg table");
}

@Test
public void testReadInformationSchema()
{
assertThat(query("SELECT table_schema FROM hive.information_schema.tables WHERE table_name = 'region' AND table_schema='" + schema + "'"))
assertThat(query("SELECT table_schema FROM hive.information_schema.tables WHERE table_name = 'region' AND table_schema='" + tpchSchema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM iceberg.information_schema.tables WHERE table_name = 'nation' AND table_schema='" + schema + "'"))
.containsAll("VALUES '" + tpchSchema + "'");
assertThat(query("SELECT table_schema FROM iceberg.information_schema.tables WHERE table_name = 'nation' AND table_schema='" + tpchSchema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'region' AND table_schema='" + schema + "'"))
.containsAll("VALUES '" + tpchSchema + "'");
assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'region' AND table_schema='" + tpchSchema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'nation' AND table_schema='" + schema + "'"))
.containsAll("VALUES '" + tpchSchema + "'");
assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'nation' AND table_schema='" + tpchSchema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM iceberg_with_redirections.information_schema.tables WHERE table_name = 'region' AND table_schema='" + schema + "'"))
.containsAll("VALUES '" + tpchSchema + "'");
assertThat(query("SELECT table_schema FROM iceberg_with_redirections.information_schema.tables WHERE table_name = 'region' AND table_schema='" + tpchSchema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
.containsAll("VALUES '" + tpchSchema + "'");

assertQuery("SELECT table_name, column_name from hive.information_schema.columns WHERE table_schema = '" + schema + "'",
assertQuery("SELECT table_name, column_name from hive.information_schema.columns WHERE table_schema = '" + tpchSchema + "'",
"VALUES ('region', 'regionkey'), ('region', 'name'), ('region', 'comment')");
assertQuery("SELECT table_name, column_name from iceberg.information_schema.columns WHERE table_schema = '" + schema + "'",
assertQuery("SELECT table_name, column_name from iceberg.information_schema.columns WHERE table_schema = '" + tpchSchema + "'",
"VALUES ('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')");
assertQuery("SELECT table_name, column_name from hive_with_redirections.information_schema.columns WHERE table_schema = '" + schema + "'",
assertQuery("SELECT table_name, column_name from hive_with_redirections.information_schema.columns WHERE table_schema = '" + tpchSchema + "'",
"VALUES" +
"('region', 'regionkey'), ('region', 'name'), ('region', 'comment'), " +
"('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')");
assertQuery("SELECT table_name, column_name from iceberg_with_redirections.information_schema.columns WHERE table_schema = '" + schema + "'",
assertQuery("SELECT table_name, column_name from iceberg_with_redirections.information_schema.columns WHERE table_schema = '" + tpchSchema + "'",
"VALUES" +
"('region', 'regionkey'), ('region', 'name'), ('region', 'comment'), " +
"('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')");
Expand All @@ -86,19 +90,19 @@ public void testReadInformationSchema()
@Test
public void testShowTables()
{
assertQuery("SHOW TABLES FROM iceberg." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM hive." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM hive_with_redirections." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM iceberg_with_redirections." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM iceberg." + tpchSchema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM hive." + tpchSchema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM hive_with_redirections." + tpchSchema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM iceberg_with_redirections." + tpchSchema, "VALUES 'region', 'nation'");

assertThat(query("SHOW CREATE TABLE iceberg." + schema + ".region"))
assertThat(query("SHOW CREATE TABLE iceberg." + tpchSchema + ".region"))
.failure().hasMessageContaining("Not an Iceberg table");
assertThat(query("SHOW CREATE TABLE hive." + schema + ".nation"))
assertThat(query("SHOW CREATE TABLE hive." + tpchSchema + ".nation"))
.failure().hasMessageContaining("Cannot query Iceberg table");

assertThat(query("DESCRIBE iceberg." + schema + ".region"))
assertThat(query("DESCRIBE iceberg." + tpchSchema + ".region"))
.failure().hasMessageContaining("Not an Iceberg table");
assertThat(query("DESCRIBE hive." + schema + ".nation"))
assertThat(query("DESCRIBE hive." + tpchSchema + ".nation"))
.failure().hasMessageContaining("Cannot query Iceberg table");
}

Expand All @@ -107,97 +111,94 @@ public void testShowSchemas()
{
assertThat(query("SHOW SCHEMAS FROM hive"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
.containsAll("VALUES '" + tpchSchema + "'");
assertThat(query("SHOW SCHEMAS FROM iceberg"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
.containsAll("VALUES '" + tpchSchema + "'");
assertThat(query("SHOW SCHEMAS FROM hive_with_redirections"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
.containsAll("VALUES '" + tpchSchema + "'");

String showCreateHiveSchema = (String) computeActual("SHOW CREATE SCHEMA hive." + schema).getOnlyValue();
String showCreateHiveSchema = (String) computeActual("SHOW CREATE SCHEMA hive." + tpchSchema).getOnlyValue();
assertThat(showCreateHiveSchema).isEqualTo(getExpectedHiveCreateSchema("hive"));
String showCreateIcebergSchema = (String) computeActual("SHOW CREATE SCHEMA iceberg." + schema).getOnlyValue();
String showCreateIcebergSchema = (String) computeActual("SHOW CREATE SCHEMA iceberg." + tpchSchema).getOnlyValue();
assertThat(showCreateIcebergSchema).isEqualTo(getExpectedIcebergCreateSchema("iceberg"));
String showCreateHiveWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA hive_with_redirections." + schema).getOnlyValue();
String showCreateHiveWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA hive_with_redirections." + tpchSchema).getOnlyValue();
assertThat(showCreateHiveWithRedirectionsSchema).isEqualTo(getExpectedHiveCreateSchema("hive_with_redirections"));
String showCreateIcebergWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA iceberg_with_redirections." + schema).getOnlyValue();
String showCreateIcebergWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA iceberg_with_redirections." + tpchSchema).getOnlyValue();
assertThat(showCreateIcebergWithRedirectionsSchema).isEqualTo(getExpectedIcebergCreateSchema("iceberg_with_redirections"));
}

@Test
public void testTimeTravelWithRedirection()
throws InterruptedException
{
String testLocalSchema = "test_schema_" + randomNameSuffix();
try {
assertUpdate("CREATE SCHEMA iceberg. " + testLocalSchema);
assertUpdate(format("CREATE TABLE iceberg.%s.nation_test AS SELECT * FROM nation", testLocalSchema), 25);
assertQuery("SELECT * FROM hive_with_redirections." + testLocalSchema + ".nation_test", "SELECT * FROM nation");
long snapshot1 = getLatestSnapshotId(testLocalSchema);
long v1EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot1, testLocalSchema);
assertUpdate(format("CREATE TABLE iceberg.%s.nation_test AS SELECT * FROM nation", testSchema), 25);
assertQuery("SELECT * FROM hive_with_redirections." + testSchema + ".nation_test", "SELECT * FROM nation");
long snapshot1 = getLatestSnapshotId(testSchema);
long v1EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot1, testSchema);
Thread.sleep(1);
assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(25, 'POLAND', 3, 'test 1')", testLocalSchema), 1);
long snapshot2 = getLatestSnapshotId(testLocalSchema);
long v2EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot2, testLocalSchema);
assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(25, 'POLAND', 3, 'test 1')", testSchema), 1);
long snapshot2 = getLatestSnapshotId(testSchema);
long v2EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot2, testSchema);
Thread.sleep(1);
assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(26, 'CHILE', 1, 'test 2')", testLocalSchema), 1);
long snapshot3 = getLatestSnapshotId(testLocalSchema);
long v3EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot3, testLocalSchema);
assertUpdate(format("INSERT INTO hive_with_redirections.%s.nation_test VALUES(26, 'CHILE', 1, 'test 2')", testSchema), 1);
long snapshot3 = getLatestSnapshotId(testSchema);
long v3EpochMillis = getCommittedAtInEpochMilliSeconds(snapshot3, testSchema);
long incorrectSnapshot = 2324324333L;
Thread.sleep(1);
assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot1), "SELECT * FROM nation");
assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v1EpochMillis)), "SELECT * FROM nation");
assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot2), "VALUES(26)");
assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testSchema, snapshot1), "SELECT * FROM nation");
assertQuery(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testSchema, timestampLiteral(v1EpochMillis)), "SELECT * FROM nation");
assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testSchema, snapshot2), "VALUES(26)");
assertQuery(format(
"SELECT count(*) FROM iceberg_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v2EpochMillis)), "VALUES(26)");
assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, snapshot3), "VALUES(27)");
"SELECT count(*) FROM iceberg_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testSchema, timestampLiteral(v2EpochMillis)), "VALUES(26)");
assertQuery(format("SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testSchema, snapshot3), "VALUES(27)");
assertQuery(format(
"SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testLocalSchema, timestampLiteral(v3EpochMillis)), "VALUES(27)");
assertQueryFails(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testLocalSchema, incorrectSnapshot), "Iceberg snapshot ID does not exists: " + incorrectSnapshot);
"SELECT count(*) FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF %s", testSchema, timestampLiteral(v3EpochMillis)), "VALUES(27)");
assertQueryFails(format("SELECT * FROM hive_with_redirections.%s.nation_test FOR VERSION AS OF %d", testSchema, incorrectSnapshot), "Iceberg snapshot ID does not exists: " + incorrectSnapshot);
assertQueryFails(
format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", testLocalSchema),
format("\\QNo version history table \"%s\".\"nation_test\" at or before 1970-01-01T00:00:00.001Z", testLocalSchema));
format("SELECT * FROM hive_with_redirections.%s.nation_test FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", testSchema),
format("\\QNo version history table \"%s\".\"nation_test\" at or before 1970-01-01T00:00:00.001Z", testSchema));
assertQueryFails(
format("SELECT * FROM iceberg_with_redirections.%s.region FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", schema),
format("SELECT * FROM iceberg_with_redirections.%s.region FOR TIMESTAMP AS OF TIMESTAMP '1970-01-01 00:00:00.001000000 Z'", tpchSchema),
"\\QThis connector does not support versioned tables");
}
finally {
assertUpdate("DROP TABLE IF EXISTS iceberg." + testLocalSchema + ".nation_test");
assertUpdate("DROP SCHEMA IF EXISTS iceberg." + testLocalSchema);
assertUpdate("DROP TABLE IF EXISTS iceberg." + testSchema + ".nation_test");
}
}

@Test
public void testMigrateTable()
{
String tableName = "test_migrate_" + randomNameSuffix();
String hiveTableName = "hive.%s.%s".formatted(schema, tableName);
String icebergTableName = "iceberg.%s.%s".formatted(schema, tableName);
String hiveTableName = "hive.%s.%s".formatted(testSchema, tableName);
String icebergTableName = "iceberg.%s.%s".formatted(testSchema, tableName);

assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT 1 id", 1);
assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*");

assertUpdate("CALL iceberg.system.migrate('" + schema + "', '" + tableName + "')");
assertUpdate("CALL iceberg.system.migrate('" + testSchema + "', '" + tableName + "')");
assertQuery("SELECT * FROM " + icebergTableName, "VALUES 1");

assertUpdate("DROP TABLE " + tableName);
assertUpdate("DROP TABLE " + icebergTableName);
}

@Test
public void testMigratePartitionedTable()
{
String tableName = "test_migrate_" + randomNameSuffix();
String hiveTableName = "hive.%s.%s".formatted(schema, tableName);
String icebergTableName = "iceberg.%s.%s".formatted(schema, tableName);
String hiveTableName = "hive.%s.%s".formatted(testSchema, tableName);
String icebergTableName = "iceberg.%s.%s".formatted(testSchema, tableName);

assertUpdate("CREATE TABLE " + hiveTableName + " WITH (partitioned_by = ARRAY['part']) AS SELECT 1 id, 'test' part", 1);
assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*");

assertUpdate("CALL iceberg.system.migrate('" + schema + "', '" + tableName + "')");
assertUpdate("CALL iceberg.system.migrate('" + testSchema + "', '" + tableName + "')");
assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'test')");

assertUpdate("DROP TABLE " + tableName);
assertUpdate("DROP TABLE " + icebergTableName);
}

private long getLatestSnapshotId(String schema)
Expand Down
Loading

0 comments on commit 212455d

Please sign in to comment.