Skip to content

Commit

Permalink
Prepare on Hive in TestSyncPartitionMetadata
Browse files Browse the repository at this point in the history
This makes running the test on ABFS easily. Also,
rename the class name.
  • Loading branch information
ebyhr committed May 6, 2022
1 parent 4fc4fe8 commit 68ac7c8
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,53 @@
*/
package io.trino.tests.product.hive;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.AfterTestWithContext;
import io.trino.tempto.BeforeTestWithContext;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.fulfillment.table.hive.HiveDataSource;
import io.trino.tempto.hadoop.hdfs.HdfsClient;
import io.trino.tempto.internal.hadoop.hdfs.HdfsDataSourceWriter;
import io.trino.tempto.query.QueryResult;
import io.trino.testng.services.Flaky;
import org.testng.annotations.Test;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tempto.fulfillment.table.hive.InlineDataSource.createResourceDataSource;
import static io.trino.tests.product.TestGroups.HIVE_PARTITIONING;
import static io.trino.tests.product.TestGroups.SMOKE;
import static io.trino.tests.product.TestGroups.TRINO_JDBC;
import static io.trino.tests.product.hive.HiveProductTest.ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE;
import static io.trino.tests.product.hive.HiveProductTest.ERROR_COMMITTING_WRITE_TO_HIVE_MATCH;
import static io.trino.tests.product.hive.util.TableLocationUtils.getTableLocation;
import static io.trino.tests.product.utils.QueryExecutors.onHive;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestSyncPartitionMetadata
/**
* Please add @Test with groups in the subclass
*/
public abstract class BaseTestSyncPartitionMetadata
extends ProductTest
{
@Inject
@Named("databases.hive.warehouse_directory_path")
private String warehouseDirectory;

@Inject
private HdfsClient hdfsClient;
@BeforeTestWithContext
public void setUp()
{
onHdfs("-rm -f -r " + schemaLocation());
onHdfs("-mkdir -p " + schemaLocation());
}

@Inject
private HdfsDataSourceWriter hdfsDataSourceWriter;
@AfterTestWithContext
public void tearDown()
{
onHdfs("-rm -f -r " + schemaLocation());
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testAddPartition()
{
String tableName = "test_sync_partition_metadata_add_partition";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
prepare(tableName);

onTrino().executeQuery("CALL system.sync_partition_metadata('default', '" + tableName + "', 'ADD')");
assertPartitions(tableName, row("a", "1"), row("b", "2"), row("f", "9"));
assertQueryFailure(() -> onTrino().executeQuery("SELECT payload, col_x, col_y FROM " + tableName + " ORDER BY 1, 2, 3 ASC"))
.hasMessageContaining(format("Partition location does not exist: hdfs://hadoop-master:9000%s/%s/col_x=b/col_y=2", warehouseDirectory, tableName));
.hasMessageMatching(format(".*Partition location does not exist: .*%s/col_x=b/col_y=2", tableLocation(tableName)));
cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testAddPartitionContainingCharactersThatNeedUrlEncoding()
{
String tableName = "test_sync_partition_metadata_add_partition_urlencode";
Expand Down Expand Up @@ -104,12 +97,10 @@ public void testAddPartitionContainingCharactersThatNeedUrlEncoding()
cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testDropPartition()
{
String tableName = "test_sync_partition_metadata_drop_partition";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
prepare(tableName);

onTrino().executeQuery("CALL system.sync_partition_metadata('default', '" + tableName + "', 'DROP')");
assertPartitions(tableName, row("a", "1"));
Expand All @@ -118,8 +109,6 @@ public void testDropPartition()
cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testDropPartitionContainingCharactersThatNeedUrlEncoding()
{
String tableName = "test_sync_partition_metadata_drop_partition_urlencode";
Expand Down Expand Up @@ -160,12 +149,10 @@ public void testDropPartitionContainingCharactersThatNeedUrlEncoding()
cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testFullSyncPartition()
{
String tableName = "test_sync_partition_metadata_add_drop_partition";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
prepare(tableName);

onTrino().executeQuery("CALL system.sync_partition_metadata('default', '" + tableName + "', 'FULL')");
assertPartitions(tableName, row("a", "1"), row("f", "9"));
Expand All @@ -174,45 +161,44 @@ public void testFullSyncPartition()
cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testInvalidSyncMode()
{
String tableName = "test_repair_invalid_mode";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
prepare(tableName);

assertQueryFailure(() -> onTrino().executeQuery("CALL system.sync_partition_metadata('default', '" + tableName + "', 'INVALID')"))
.hasMessageMatching("Query failed (.*): Invalid partition metadata sync mode: INVALID");

cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testMixedCasePartitionNames()
{
String tableName = "test_sync_partition_mixed_case";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
prepare(tableName);
String tableLocation = tableLocation(tableName);
HiveDataSource dataSource = createResourceDataSource(tableName, "io/trino/tests/product/hive/data/single_int_column/data.orc");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/col_x=h/col_Y=11", dataSource);
hdfsClient.createDirectory(tableLocation + "/COL_X=UPPER/COL_Y=12");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/COL_X=UPPER/COL_Y=12", dataSource);

String dataSource = generateOrcFile();
onHdfs(format("-mkdir -p %s/col_x=h/col_Y=11", tableLocation));
onHdfs(format("-cp %s %s/col_x=h/col_Y=11", dataSource, tableLocation));

onHdfs(format("-mkdir -p %s/COL_X=UPPER/COL_Y=12", tableLocation));
onHdfs(format("-cp %s %s/COL_X=UPPER/COL_Y=12", dataSource, tableLocation));

onTrino().executeQuery("CALL system.sync_partition_metadata('default', '" + tableName + "', 'FULL', false)");
assertPartitions(tableName, row("UPPER", "12"), row("a", "1"), row("f", "9"), row("g", "10"), row("h", "11"));
assertData(tableName, row(1, "a", "1"), row(42, "UPPER", "12"), row(42, "f", "9"), row(42, "g", "10"), row(42, "h", "11"));
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
public void testConflictingMixedCasePartitionNames()
{
String tableName = "test_sync_partition_mixed_case";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
HiveDataSource dataSource = createResourceDataSource(tableName, "io/trino/tests/product/hive/data/single_int_column/data.orc");
String tableLocation = tableLocation(tableName);
prepare(tableName);
String dataSource = generateOrcFile();
// this conflicts with a partition that already exits in the metastore
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation(tableName) + "/COL_X=a/cOl_y=1", dataSource);
onHdfs(format("-mkdir -p %s/COL_X=a/cOl_y=1", tableLocation));
onHdfs(format("-cp %s %s/COL_X=a/cOl_y=1", dataSource, tableLocation));

assertThatThrownBy(() -> onTrino().executeQuery("CALL system.sync_partition_metadata('default', '" + tableName + "', 'ADD', false)"))
.hasMessageContaining(format("One or more partitions already exist for table 'default.%s'", tableName));
Expand All @@ -221,36 +207,56 @@ public void testConflictingMixedCasePartitionNames()

private String tableLocation(String tableName)
{
return warehouseDirectory + '/' + tableName;
return schemaLocation() + '/' + tableName;
}

private void prepare(HdfsClient hdfsClient, HdfsDataSourceWriter hdfsDataSourceWriter, String tableName)
protected abstract String schemaLocation();

private void prepare(String tableName)
{
String tableLocation = tableLocation(tableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName);
onHdfs("-rm -f -r " + tableLocation);
onHdfs("-mkdir -p " + tableLocation);

onTrino().executeQuery("CREATE TABLE " + tableName + " (payload bigint, col_x varchar, col_y varchar) WITH (format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])");
onHive().executeQuery("CREATE TABLE " + tableName + " (payload bigint) PARTITIONED BY (col_x string, col_y string) STORED AS ORC LOCATION '" + tableLocation + "'");
onTrino().executeQuery("INSERT INTO " + tableName + " VALUES (1, 'a', '1'), (2, 'b', '2')");

String tableLocation = tableLocation(tableName);
String filePath = generateOrcFile();

// remove partition col_x=b/col_y=2
hdfsClient.delete(tableLocation + "/col_x=b/col_y=2");
onHdfs(format("-rm -f -r %s/col_x=b/col_y=2", tableLocation));
// add partition directory col_x=f/col_y=9 with single_int_column/data.orc file
hdfsClient.createDirectory(tableLocation + "/col_x=f/col_y=9");
HiveDataSource dataSource = createResourceDataSource(tableName, "io/trino/tests/product/hive/data/single_int_column/data.orc");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/col_x=f/col_y=9", dataSource);
onHdfs(format("-mkdir -p %s/col_x=f/col_y=9", tableLocation));
onHdfs(format("-cp %s %s/col_x=f/col_y=9", filePath, tableLocation));

// should only be picked up when not in case sensitive mode
hdfsClient.createDirectory(tableLocation + "/COL_X=g/col_y=10");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/COL_X=g/col_y=10", dataSource);
onHdfs(format("-mkdir -p %s/COL_X=g/col_y=10", tableLocation));
onHdfs(format("-cp %s %s/COL_X=g/col_y=10", filePath, tableLocation));

// add invalid partition path
hdfsClient.createDirectory(tableLocation + "/col_x=d");
hdfsClient.createDirectory(tableLocation + "/col_y=3/col_x=h");
hdfsClient.createDirectory(tableLocation + "/col_y=3");
hdfsClient.createDirectory(tableLocation + "/xyz");
onHdfs(format("-mkdir -p %s/col_x=d", tableLocation));
onHdfs(format("-mkdir -p %s/col_y=3/col_x=h", tableLocation));
onHdfs(format("-mkdir -p %s/col_y=3", tableLocation));
onHdfs(format("-mkdir -p %s/xyz", tableLocation));

assertPartitions(tableName, row("a", "1"), row("b", "2"));
}

// Drop and create a table. Then, return single ORC file path
private String generateOrcFile()
{
onTrino().executeQuery("DROP TABLE IF EXISTS single_int_column");
onTrino().executeQuery("CREATE TABLE single_int_column (payload bigint) WITH (format = 'ORC')");
onTrino().executeQuery("INSERT INTO single_int_column VALUES (42)");
return getOnlyElement(onTrino().executeQuery("SELECT \"$path\" FROM single_int_column").row(0)).toString();
}

private void onHdfs(String command)
{
onHive().executeQuery("dfs " + command);
}

private static void cleanup(String tableName)
{
onTrino().executeQuery("DROP TABLE " + tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.hive;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.testng.services.Flaky;
import org.testng.annotations.Test;

import static io.trino.tests.product.TestGroups.HIVE_PARTITIONING;
import static io.trino.tests.product.TestGroups.SMOKE;
import static io.trino.tests.product.TestGroups.TRINO_JDBC;
import static io.trino.tests.product.hive.HiveProductTest.ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE;
import static io.trino.tests.product.hive.HiveProductTest.ERROR_COMMITTING_WRITE_TO_HIVE_MATCH;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static java.lang.String.format;

public class TestHdfsSyncPartitionMetadata
extends BaseTestSyncPartitionMetadata
{
@Inject
@Named("databases.hive.warehouse_directory_path")
private String warehouseDirectory;

private final String schema = "test_" + randomTableSuffix();

@Override
protected String schemaLocation()
{
return format("%s/%s", warehouseDirectory, schema);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testAddPartition()
{
super.testAddPartition();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testAddPartitionContainingCharactersThatNeedUrlEncoding()
{
super.testAddPartitionContainingCharactersThatNeedUrlEncoding();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testDropPartition()
{
super.testDropPartition();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testDropPartitionContainingCharactersThatNeedUrlEncoding()
{
super.testDropPartitionContainingCharactersThatNeedUrlEncoding();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testFullSyncPartition()
{
super.testFullSyncPartition();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE, TRINO_JDBC})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testInvalidSyncMode()
{
super.testInvalidSyncMode();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testMixedCasePartitionNames()
{
super.testMixedCasePartitionNames();
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
@Flaky(issue = ERROR_COMMITTING_WRITE_TO_HIVE_ISSUE, match = ERROR_COMMITTING_WRITE_TO_HIVE_MATCH)
@Override
public void testConflictingMixedCasePartitionNames()
{
super.testConflictingMixedCasePartitionNames();
}
}

0 comments on commit 68ac7c8

Please sign in to comment.