Skip to content

Commit

Permalink
Merge pull request #74 from motherduckdb/pb/explicit-iceberg-cardinality
Browse files Browse the repository at this point in the history
exploit iceberg row-count metadata
  • Loading branch information
samansmink authored Oct 15, 2024
2 parents 31fa736 + 826397d commit d62d91d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
27 changes: 25 additions & 2 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,26 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {

//! Build the Parquet Scan expression for the files we need to scan
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) {
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths,
string metadata_compression_codec, bool skip_schema_inference,
int64_t data_cardinality, int64_t delete_cardinality) {

auto cardinality = make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("explicit_cardinality"),
make_uniq<ConstantExpression>(Value(data_cardinality)));

// No deletes, just return a TableFunctionRef for a parquet scan of the data files
if (delete_file_values.empty()) {
auto table_function_ref_data = make_uniq<TableFunctionRef>();
table_function_ref_data->alias = "iceberg_scan_data";
vector<unique_ptr<ParsedExpression>> left_children;
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
left_children.push_back(std::move(cardinality));
if (!skip_schema_inference) {
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
}

table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
return std::move(table_function_ref_data);
}
Expand All @@ -165,6 +173,7 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
table_function_ref_data->alias = "iceberg_scan_data";
vector<unique_ptr<ParsedExpression>> left_children;
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
left_children.push_back(std::move(cardinality));
left_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL,
make_uniq<ColumnRefExpression>("filename"),
make_uniq<ConstantExpression>(Value(1))));
Expand All @@ -184,6 +193,8 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
table_function_ref_deletes->alias = "iceberg_scan_deletes";
vector<unique_ptr<ParsedExpression>> right_children;
right_children.push_back(make_uniq<ConstantExpression>(Value::LIST(delete_file_values)));
right_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("explicit_cardinality"),
make_uniq<ConstantExpression>(Value(delete_cardinality))));
table_function_ref_deletes->function = make_uniq<FunctionExpression>("parquet_scan", std::move(right_children));
join_node->right = std::move(table_function_ref_deletes);

Expand Down Expand Up @@ -269,7 +280,19 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
if (mode == "list_files") {
return MakeListFilesExpression(data_file_values, delete_file_values);
} else if (mode == "default") {
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference);
int64_t data_cardinality = 0, delete_cardinality = 0;
for(auto &manifest : iceberg_table.entries) {
for(auto &entry : manifest.manifest_entries) {
if (entry.status != IcebergManifestEntryStatusType::DELETED) {
if (entry.content == IcebergManifestEntryContentType::DATA) {
data_cardinality += entry.record_count;
} else { // DELETES
delete_cardinality += entry.record_count;
}
}
}
}
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality);
} else {
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
}
Expand Down
8 changes: 7 additions & 1 deletion test/sql/iceberg_scan_generated_data_0_001.test
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# name: test/sql/iceberg_scan_generated_data_0_001.test_slow
# name: test/sql/iceberg_scan_generated_data_0_001.test
# description: test iceberg extension with the sf0.001 generated test set
# group: [iceberg]

Expand Down Expand Up @@ -106,3 +106,9 @@ statement error
DESCRIBE SELECT schema_evol_added_col_1 FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table/metadata/v6.metadata.json') ORDER BY uuid;
----
Binder Error

# Check that there are injected cardinality
query II
EXPLAIN SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table');
----
physical_plan <REGEX>:.* ANTI .*PARQUET_SCAN.*Rows.*Rows.*

0 comments on commit d62d91d

Please sign in to comment.