From 3f2410ebb0a600293862839299ee6fa535546b4b Mon Sep 17 00:00:00 2001 From: peter Date: Wed, 9 Oct 2024 14:09:57 +0200 Subject: [PATCH 1/5] exploit iceberg row-count metadata Examine the row-counts in the manifest and count how many rows are in the (existing or added) data an deletion files. Use these two counts to pass the new named_parameter "explicit_cardinality" into the generated parquet_scans. Note that because iceberg is passing in an explicit schema into the parquet_scan, it does not open any file during bind. That means the parquet_scans it generates did not have any cardinality information during query optimization. That can cause rather bad query plans. This PR (pb/explicit-iceberg-cardinality) fixes that. Note that this PR needs the DuckDB PR pb/exicit-parquet-cardinality that adds the "explicit_cardinality" named_parameter to parquet_scan. --- src/iceberg_functions/iceberg_scan.cpp | 27 ++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index 4e0b5cc..72b3774 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -129,18 +129,26 @@ static Value GetParquetSchemaParam(vector &schema) { //! Build the Parquet Scan expression for the files we need to scan static unique_ptr MakeScanExpression(vector &data_file_values, vector &delete_file_values, - vector &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) { + vector &schema, bool allow_moved_paths, + string metadata_compression_codec, bool skip_schema_inference, + int64_t data_cardinality, int64_t delete_cardinality) { + + auto cardinality = make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), + make_uniq(Value(data_cardinality))); + // No deletes, just return a TableFunctionRef for a parquet scan of the data files if (delete_file_values.empty()) { auto table_function_ref_data = make_uniq(); table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); + left_children.push_back(move(cardinality)); if (!skip_schema_inference) { left_children.push_back( make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), make_uniq(GetParquetSchemaParam(schema)))); } + table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); return std::move(table_function_ref_data); } @@ -165,6 +173,7 @@ static unique_ptr MakeScanExpression(vector &data_file_values, table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); + left_children.push_back(move(cardinality)); left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("filename"), make_uniq(Value(1)))); @@ -184,6 +193,8 @@ static unique_ptr MakeScanExpression(vector &data_file_values, table_function_ref_deletes->alias = "iceberg_scan_deletes"; vector> right_children; right_children.push_back(make_uniq(Value::LIST(delete_file_values))); + right_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), + make_uniq(Value(delete_cardinality)))); table_function_ref_deletes->function = make_uniq("parquet_scan", std::move(right_children)); join_node->right = std::move(table_function_ref_deletes); @@ -269,7 +280,19 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table if (mode == "list_files") { return MakeListFilesExpression(data_file_values, delete_file_values); } else if (mode == "default") { - return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference); + int64_t data_cardinality = 0, delete_cardinality = 0; + for(auto &manifest : iceberg_table.entries) { + for(auto &entry : manifest.manifest_entries) { + if (entry.status != IcebergManifestEntryStatusType::DELETED) { + if (entry.content == IcebergManifestEntryContentType::DATA) { + data_cardinality += entry.record_count; + } else { // DELETES + delete_cardinality += entry.record_count; + } + } + } + } + return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality); } else { throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'"); } From bc2b45cd80093ff7fabb4a092da34efcf60a52c2 Mon Sep 17 00:00:00 2001 From: peter Date: Wed, 9 Oct 2024 15:18:14 +0200 Subject: [PATCH 2/5] std::move --- src/iceberg_functions/iceberg_scan.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index 72b3774..2b3ec86 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -142,7 +142,7 @@ static unique_ptr MakeScanExpression(vector &data_file_values, table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); - left_children.push_back(move(cardinality)); + left_children.push_back(std::move(cardinality)); if (!skip_schema_inference) { left_children.push_back( make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), @@ -173,7 +173,7 @@ static unique_ptr MakeScanExpression(vector &data_file_values, table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); - left_children.push_back(move(cardinality)); + left_children.push_back(std::move(cardinality)); left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("filename"), make_uniq(Value(1)))); From 136480d3504f1bf9e892276564ec1123e08215e4 Mon Sep 17 00:00:00 2001 From: peter Date: Fri, 11 Oct 2024 20:02:13 +0200 Subject: [PATCH 3/5] bogus commit to trigger CI --- src/iceberg_functions/iceberg_scan.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index 2b3ec86..d950f2a 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -15,8 +15,8 @@ #include "duckdb/parser/expression/star_expression.hpp" #include "duckdb/parser/tableref/subqueryref.hpp" #include "duckdb/parser/tableref/emptytableref.hpp" -#include "duckdb/planner/operator/logical_get.hpp" #include "duckdb/planner/operator/logical_comparison_join.hpp" +#include "duckdb/planner/operator/logical_get.hpp" #include "duckdb/common/file_opener.hpp" #include "duckdb/common/file_system.hpp" #include "iceberg_metadata.hpp" From 30a42d93a72054ce0fd4d3b49454834d69e0438a Mon Sep 17 00:00:00 2001 From: peter Date: Mon, 14 Oct 2024 14:30:54 +0200 Subject: [PATCH 4/5] bugus commit to trigger CI --- src/iceberg_functions/iceberg_scan.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index d950f2a..2b3ec86 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -15,8 +15,8 @@ #include "duckdb/parser/expression/star_expression.hpp" #include "duckdb/parser/tableref/subqueryref.hpp" #include "duckdb/parser/tableref/emptytableref.hpp" -#include "duckdb/planner/operator/logical_comparison_join.hpp" #include "duckdb/planner/operator/logical_get.hpp" +#include "duckdb/planner/operator/logical_comparison_join.hpp" #include "duckdb/common/file_opener.hpp" #include "duckdb/common/file_system.hpp" #include "iceberg_metadata.hpp" From 826397d8619a2e1a9cada64cb88d9d18dcd36bb2 Mon Sep 17 00:00:00 2001 From: peter Date: Tue, 15 Oct 2024 10:26:27 +0200 Subject: [PATCH 5/5] - correct test name - add test that checks that there are cardinalities in the generated parquet_scans --- test/sql/iceberg_scan_generated_data_0_001.test | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/test/sql/iceberg_scan_generated_data_0_001.test b/test/sql/iceberg_scan_generated_data_0_001.test index 6350e3d..ab5378f 100644 --- a/test/sql/iceberg_scan_generated_data_0_001.test +++ b/test/sql/iceberg_scan_generated_data_0_001.test @@ -1,4 +1,4 @@ -# name: test/sql/iceberg_scan_generated_data_0_001.test_slow +# name: test/sql/iceberg_scan_generated_data_0_001.test # description: test iceberg extension with the sf0.001 generated test set # group: [iceberg] @@ -106,3 +106,9 @@ statement error DESCRIBE SELECT schema_evol_added_col_1 FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table/metadata/v6.metadata.json') ORDER BY uuid; ---- Binder Error + +# Check that there are injected cardinality +query II +EXPLAIN SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table'); +---- +physical_plan :.* ANTI .*PARQUET_SCAN.*Rows.*Rows.* \ No newline at end of file