diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index 4e0b5cc..2b3ec86 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -129,18 +129,26 @@ static Value GetParquetSchemaParam(vector &schema) { //! Build the Parquet Scan expression for the files we need to scan static unique_ptr MakeScanExpression(vector &data_file_values, vector &delete_file_values, - vector &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) { + vector &schema, bool allow_moved_paths, + string metadata_compression_codec, bool skip_schema_inference, + int64_t data_cardinality, int64_t delete_cardinality) { + + auto cardinality = make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), + make_uniq(Value(data_cardinality))); + // No deletes, just return a TableFunctionRef for a parquet scan of the data files if (delete_file_values.empty()) { auto table_function_ref_data = make_uniq(); table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); + left_children.push_back(std::move(cardinality)); if (!skip_schema_inference) { left_children.push_back( make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), make_uniq(GetParquetSchemaParam(schema)))); } + table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); return std::move(table_function_ref_data); } @@ -165,6 +173,7 @@ static unique_ptr MakeScanExpression(vector &data_file_values, table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); + left_children.push_back(std::move(cardinality)); left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("filename"), make_uniq(Value(1)))); @@ -184,6 +193,8 @@ static unique_ptr MakeScanExpression(vector &data_file_values, table_function_ref_deletes->alias = "iceberg_scan_deletes"; vector> right_children; right_children.push_back(make_uniq(Value::LIST(delete_file_values))); + right_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), + make_uniq(Value(delete_cardinality)))); table_function_ref_deletes->function = make_uniq("parquet_scan", std::move(right_children)); join_node->right = std::move(table_function_ref_deletes); @@ -269,7 +280,19 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table if (mode == "list_files") { return MakeListFilesExpression(data_file_values, delete_file_values); } else if (mode == "default") { - return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference); + int64_t data_cardinality = 0, delete_cardinality = 0; + for(auto &manifest : iceberg_table.entries) { + for(auto &entry : manifest.manifest_entries) { + if (entry.status != IcebergManifestEntryStatusType::DELETED) { + if (entry.content == IcebergManifestEntryContentType::DATA) { + data_cardinality += entry.record_count; + } else { // DELETES + delete_cardinality += entry.record_count; + } + } + } + } + return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality); } else { throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'"); } diff --git a/test/sql/iceberg_scan_generated_data_0_001.test b/test/sql/iceberg_scan_generated_data_0_001.test index 6350e3d..ab5378f 100644 --- a/test/sql/iceberg_scan_generated_data_0_001.test +++ b/test/sql/iceberg_scan_generated_data_0_001.test @@ -1,4 +1,4 @@ -# name: test/sql/iceberg_scan_generated_data_0_001.test_slow +# name: test/sql/iceberg_scan_generated_data_0_001.test # description: test iceberg extension with the sf0.001 generated test set # group: [iceberg] @@ -106,3 +106,9 @@ statement error DESCRIBE SELECT schema_evol_added_col_1 FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table/metadata/v6.metadata.json') ORDER BY uuid; ---- Binder Error + +# Check that there are injected cardinality +query II +EXPLAIN SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table'); +---- +physical_plan :.* ANTI .*PARQUET_SCAN.*Rows.*Rows.* \ No newline at end of file