Skip to content

Commit

Permalink
Reduce column mapping mode usage in CDF tests
Browse files Browse the repository at this point in the history
The remaining test cases cover column mapping mode with
UPDATE, DELETE, MERGE, non-lowercase columns and partitioned table.
  • Loading branch information
ebyhr committed Sep 5, 2023
1 parent 5839030 commit c9af47f
Showing 1 changed file with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,16 @@ public void testUpdatePartitionedTableWithCdf(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled(String columnMappingMode)
public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled()
{
String tableName = "test_updates_to_table_with_many_rows_inserted_in_one_query_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)");
onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 5 WHERE col1 = 'testValue3'");
Expand All @@ -187,17 +187,17 @@ public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled(Strin
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled(String columnMappingMode)
public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled()
{
String tableName = "test_updates_to_partitioned_table_with_many_rows_inserted_in_one_query_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) " +
"USING DELTA " +
"PARTITIONED BY (partitioning_column_1, partitioning_column_2) " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES " +
"('testValue1', 1, 'partition1'), " +
Expand All @@ -220,17 +220,17 @@ public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdf
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated(String columnMappingMode)
public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated()
{
String tableName = "test_updates_partitioning_column_in_table_with_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) " +
"USING DELTA " +
"PARTITIONED BY (partitioning_column_1, partitioning_column_2) " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES " +
"('testValue1', 1, 'partition1'), " +
Expand Down Expand Up @@ -258,15 +258,14 @@ public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated(Str
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated(String columnMappingMode)
public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated()
{
String tableName = "test_updates_to_table_with_cdf_enabled_later_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) " +
"USING DELTA " +
"TBLPROPERTIES ('delta.columnMapping.mode' = '" + columnMappingMode + "') " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)");
Expand Down Expand Up @@ -452,17 +451,17 @@ public void testMergeDeleteIntoTableWithCdfEnabled(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled(String columnMappingMode)
public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled()
{
String targetTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_" + randomNameSuffix();
String sourceTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_data_table_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + targetTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");
onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'");
Expand Down Expand Up @@ -514,17 +513,17 @@ public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled(String columnMa
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testDeleteFromNullPartitionWithCdfEnabled(String columnMappingMode)
public void testDeleteFromNullPartitionWithCdfEnabled()
{
String tableName = "test_delete_from_null_partition_with_cdf_enabled" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) " +
"USING DELTA " +
"PARTITIONED BY (partitioning_column_1, partitioning_column_2) " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES " +
"('testValue1', 1, 'partition1'), " +
Expand Down Expand Up @@ -553,14 +552,14 @@ public void testDeleteFromNullPartitionWithCdfEnabled(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTurningOnAndOffCdfFromTrino(String columnMappingMode)
public void testTurningOnAndOffCdfFromTrino()
{
String tableName = "test_turning_cdf_on_and_off_from_trino" + randomNameSuffix();
try {
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true, column_mapping_mode = '" + columnMappingMode + "')");
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true)");

assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue().toString()).contains("change_data_feed_enabled = true");

Expand Down Expand Up @@ -617,17 +616,17 @@ public void testThatCdfDoesntWorkWhenPropertyIsNotSet()
assertThereIsNoCdfFileGenerated(tableName2, "change_data_feed_enabled = false");
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoCanReadCdfEntriesGeneratedByDelta(String columnMappingMode)
public void testTrinoCanReadCdfEntriesGeneratedByDelta()
{
String targetTableName = "test_trino_can_read_cdf_entries_generated_by_delta_target_" + randomNameSuffix();
String sourceTableName = "test_trino_can_read_cdf_entries_generated_by_delta_source_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + targetTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true, 'delta.columnMapping.mode' = '" + columnMappingMode + "')");
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");
onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (page_id INT, page_url STRING, views INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'");
Expand Down Expand Up @@ -685,20 +684,20 @@ public void testTrinoCanReadCdfEntriesGeneratedByDelta(String columnMappingMode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testDeltaCanReadCdfEntriesGeneratedByTrino(String columnMappingMode)
public void testDeltaCanReadCdfEntriesGeneratedByTrino()
{
String targetTableName = "test_delta_can_read_cdf_entries_generated_by_trino_target_" + randomNameSuffix();
String sourceTableName = "test_delta_can_read_cdf_entries_generated_by_trino_source_" + randomNameSuffix();
try {
onTrino().executeQuery("CREATE TABLE delta.default." + targetTableName + " (page_id INT, page_url VARCHAR, views INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName +
"', change_data_feed_enabled = true, column_mapping_mode = '" + columnMappingMode + "')");
"', change_data_feed_enabled = true)");

onTrino().executeQuery("CREATE TABLE delta.default." + sourceTableName + " (page_id INT, page_url VARCHAR, views INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName +
"', change_data_feed_enabled = true, column_mapping_mode = '" + columnMappingMode + "')");
"', change_data_feed_enabled = true)");

onTrino().executeQuery("INSERT INTO delta.default." + targetTableName + " VALUES (1, 'pageUrl1', 100), (2, 'pageUrl2', 200), (3, 'pageUrl3', 300)");
onTrino().executeQuery("INSERT INTO delta.default." + targetTableName + " VALUES (4, 'pageUrl4', 400)");
Expand Down

0 comments on commit c9af47f

Please sign in to comment.