diff --git a/.gitignore b/.gitignore index cb608ef..592a7fb 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ test/python/__pycache__/ .Rhistory test/sql/tmp.test data/iceberg/generated_* +data/iceberg/generated_*/ +data/iceberg/ scripts/metastore_db/ scripts/derby.log scripts/test-script-with-path.sql diff --git a/scripts/test_data_generator/generate_base_parquet.py b/scripts/test_data_generator/generate_base_parquet.py index 55aad73..87f2837 100755 --- a/scripts/test_data_generator/generate_base_parquet.py +++ b/scripts/test_data_generator/generate_base_parquet.py @@ -46,7 +46,8 @@ l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz, l_comment as l_comment_string, gen_random_uuid()::VARCHAR as uuid, - l_comment::BLOB as l_comment_blob + l_comment::BLOB as l_comment_blob, + l_shipmode as l_shipmode_string FROM lineitem;"""); elif (MODE.lower() == "default"): @@ -67,7 +68,8 @@ l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz, l_comment as l_comment_string, gen_random_uuid()::UUID as uuid, - l_comment::BLOB as l_comment_blob + l_comment::BLOB as l_comment_blob, + l_shipmode as l_shipmode_string FROM lineitem;"""); else: diff --git a/scripts/test_data_generator/generate_iceberg.py b/scripts/test_data_generator/generate_iceberg.py index 5f6afe7..5c09ae6 100755 --- a/scripts/test_data_generator/generate_iceberg.py +++ b/scripts/test_data_generator/generate_iceberg.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/Users/mritchie712/opt/anaconda3/bin/python import pyspark import pyspark.sql import sys @@ -8,7 +8,7 @@ from pathlib import Path if (len(sys.argv) != 4 ): - print("Usage: generate_iceberg.py ") + print("Usage: generate_iceberg.py ") exit(1) SCALE = sys.argv[1] @@ -16,7 +16,7 @@ ICEBERG_SPEC_VERSION = sys.argv[3] PARQUET_SRC_FILE = f'{DEST_PATH}/base_file/file.parquet' -TABLE_NAME = "iceberg_catalog.pyspark_iceberg_table"; +TABLE_NAME = "iceberg_catalog.pyspark_iceberg_table" CWD = os.getcwd() SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -26,7 +26,7 @@ os.system(f"python3 {SCRIPT_DIR}/generate_base_parquet.py {SCALE} {CWD}/{DEST_PATH} spark") ### -### Configure everyone's favorite apache product +### Configure Spark with Iceberg ### conf = pyspark.SparkConf() conf.setMaster('local[*]') @@ -42,23 +42,46 @@ sc.setLogLevel("ERROR") ### -### Create Iceberg table from dataset +### Create Iceberg table from dataset with partitioning ### -spark.read.parquet(PARQUET_SRC_FILE).createOrReplaceTempView('parquet_file_view'); +spark.read.parquet(PARQUET_SRC_FILE).createOrReplaceTempView('parquet_file_view') + +# Define your partition columns and transforms +partition_spec = "year(l_shipdate_date), l_shipmode_string" # Adjust 'l_shipdate_date' as needed if ICEBERG_SPEC_VERSION == '1': - spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}') AS SELECT * FROM parquet_file_view"); + create_table_sql = f""" + CREATE OR REPLACE TABLE {TABLE_NAME} + USING iceberg + PARTITIONED BY ({partition_spec}) + TBLPROPERTIES ( + 'format-version' = '{ICEBERG_SPEC_VERSION}' + ) + AS SELECT * FROM parquet_file_view + """ elif ICEBERG_SPEC_VERSION == '2': - spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}', 'write.update.mode'='merge-on-read') AS SELECT * FROM parquet_file_view"); + create_table_sql = f""" + CREATE OR REPLACE TABLE {TABLE_NAME} + USING iceberg + PARTITIONED BY ({partition_spec}) + TBLPROPERTIES ( + 'format-version' = '{ICEBERG_SPEC_VERSION}', + 'write.update.mode' = 'merge-on-read' + ) + AS SELECT * FROM parquet_file_view + """ else: print(f"Are you from the future? Iceberg spec version '{ICEBERG_SPEC_VERSION}' is unbeknownst to me") exit(1) +# Execute the CREATE TABLE statement +spark.sql(create_table_sql) + ### ### Apply modifications to base table generating verification results between each step ### update_files = [str(path) for path in Path(f'{SCRIPT_DIR}/updates_v{ICEBERG_SPEC_VERSION}').rglob('*.sql')] -update_files.sort() # Order matters obviously +update_files.sort() # Order matters obviously last_file = "" for path in update_files: @@ -82,7 +105,7 @@ # Create copy of table df = spark.read.table(TABLE_NAME) - df.write.parquet(f"{DEST_PATH}/expected_results/{file_trimmed}/data"); + df.write.parquet(f"{DEST_PATH}/expected_results/{file_trimmed}/data") # For documentation, also write the query we executed to the data query_path = f'{DEST_PATH}/expected_results/{file_trimmed}/query.sql' @@ -90,9 +113,8 @@ f.write("-- The query executed at this step:\n") f.write(query) - ### ### Finally, we copy the latest results to a "final" dir for easy test writing ### import shutil -shutil.copytree(f"{DEST_PATH}/expected_results/{last_file}", f"{DEST_PATH}/expected_results/last") +shutil.copytree(f"{DEST_PATH}/expected_results/{last_file}", f"{DEST_PATH}/expected_results/last") \ No newline at end of file diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 6e63969..784583b 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -80,8 +80,8 @@ vector IcebergTable::ReadManifestEntries(const string &pat } } else { auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA); - avro::DataFileReader dfr(std::move(stream), schema); - c::manifest_entry manifest_entry; + avro::DataFileReader dfr(std::move(stream), schema); + manifest_entry manifest_entry; while (dfr.read(manifest_entry)) { ret.emplace_back(IcebergManifestEntry(manifest_entry)); } diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index 2b3ec86..cc1f406 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -1,31 +1,33 @@ #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" #include "duckdb/common/enums/join_type.hpp" -#include "duckdb/parser/query_node/select_node.hpp" -#include "duckdb/parser/tableref/joinref.hpp" #include "duckdb/common/enums/joinref_type.hpp" #include "duckdb/common/enums/tableref_type.hpp" -#include "duckdb/parser/tableref/table_function_ref.hpp" -#include "duckdb/parser/query_node/recursive_cte_node.hpp" +#include "duckdb/common/file_opener.hpp" +#include "duckdb/common/file_system.hpp" +#include "duckdb/common/printer.hpp" +#include "duckdb/parser/expression/comparison_expression.hpp" +#include "duckdb/parser/expression/conjunction_expression.hpp" #include "duckdb/parser/expression/constant_expression.hpp" #include "duckdb/parser/expression/function_expression.hpp" -#include "duckdb/parser/expression/conjunction_expression.hpp" -#include "duckdb/planner/expression/bound_reference_expression.hpp" -#include "duckdb/parser/expression/comparison_expression.hpp" #include "duckdb/parser/expression/star_expression.hpp" -#include "duckdb/parser/tableref/subqueryref.hpp" +#include "duckdb/parser/query_node/recursive_cte_node.hpp" +#include "duckdb/parser/query_node/select_node.hpp" #include "duckdb/parser/tableref/emptytableref.hpp" -#include "duckdb/planner/operator/logical_get.hpp" +#include "duckdb/parser/tableref/joinref.hpp" +#include "duckdb/parser/tableref/subqueryref.hpp" +#include "duckdb/parser/tableref/table_function_ref.hpp" +#include "duckdb/planner/expression/bound_reference_expression.hpp" #include "duckdb/planner/operator/logical_comparison_join.hpp" -#include "duckdb/common/file_opener.hpp" -#include "duckdb/common/file_system.hpp" +#include "duckdb/planner/operator/logical_get.hpp" +#include "iceberg_functions.hpp" #include "iceberg_metadata.hpp" #include "iceberg_utils.hpp" -#include "iceberg_functions.hpp" #include "yyjson.hpp" -#include +#include #include +#include namespace duckdb { @@ -36,6 +38,34 @@ struct IcebergScanGlobalTableFunctionState : public GlobalTableFunctionState { } }; +// === Derived TableFunctionInfo to hold constraints === +struct IcebergTableFunctionInfo : public TableFunctionInfo { + vector> constraints; + + IcebergTableFunctionInfo(vector> &&constraints_p) + : constraints(std::move(constraints_p)) { + } +}; + +// === Helper function to recursively extract comparison predicates from expressions === +static void ExtractPredicates(ParsedExpression &expr, vector> &predicates) { + if (expr.type == ExpressionType::CONJUNCTION_AND) { + auto &conj = (ConjunctionExpression &)expr; + // Access children instead of left and right + if (conj.children.size() >= 2) { + ExtractPredicates(*conj.children[0], predicates); + ExtractPredicates(*conj.children[1], predicates); + } + } else if (expr.type == ExpressionType::COMPARE_EQUAL || expr.type == ExpressionType::COMPARE_GREATERTHAN || + expr.type == ExpressionType::COMPARE_GREATERTHANOREQUALTO || + expr.type == ExpressionType::COMPARE_LESSTHAN || + expr.type == ExpressionType::COMPARE_LESSTHANOREQUALTO) { + // Clone the expression and add to predicates + predicates.emplace_back(expr.Copy()); + } + // Add more conditions here if you want to handle OR or other expressions +} + static unique_ptr GetFilenameExpr(unique_ptr colref_expr) { vector> split_children; split_children.push_back(std::move(colref_expr)); @@ -105,50 +135,301 @@ static Value GetParquetSchemaParam(vector &schema) { for (auto &schema_entry : schema) { child_list_t map_value_children; - map_value_children.push_back(make_pair("name", Value(schema_entry.name))); - map_value_children.push_back(make_pair("type", Value(schema_entry.type.ToString()))); - map_value_children.push_back(make_pair("default_value", schema_entry.default_value)); + map_value_children.emplace_back(make_pair("name", Value(schema_entry.name))); + map_value_children.emplace_back(make_pair("type", Value(schema_entry.type.ToString()))); + map_value_children.emplace_back(make_pair("default_value", schema_entry.default_value)); auto map_value = Value::STRUCT(map_value_children); child_list_t map_entry_children; - map_entry_children.push_back(make_pair("key", schema_entry.id)); - map_entry_children.push_back(make_pair("values", map_value)); + map_entry_children.emplace_back(make_pair("key", schema_entry.id)); + map_entry_children.emplace_back(make_pair("values", map_value)); // Updated key to "values" auto map_entry = Value::STRUCT(map_entry_children); map_entries.push_back(map_entry); } - auto param_type = - LogicalType::STRUCT({{"key", LogicalType::INTEGER}, - {"value", LogicalType::STRUCT({{{"name", LogicalType::VARCHAR}, - {"type", LogicalType::VARCHAR}, - {"default_value", LogicalType::VARCHAR}}})}}); + auto param_type = LogicalType::STRUCT({{"key", LogicalType::INTEGER}, + {"value", LogicalType::STRUCT({{"name", LogicalType::VARCHAR}, + {"type", LogicalType::VARCHAR}, + {"default_value", LogicalType::VARCHAR}})}}); auto ret = Value::MAP(param_type, map_entries); return ret; } +// Utility function to convert byte vector to hex string for logging +static std::string ByteArrayToHexString(const std::vector &bytes) { + std::ostringstream oss; + for (auto byte : bytes) { + oss << std::hex << std::setw(2) << std::setfill('0') << (int)byte; + } + return oss.str(); +} + +static Value DeserializeBound(const std::vector &bound_value, const LogicalType &type) { + Value deserialized_value; + try { + switch (type.id()) { + case LogicalTypeId::INTEGER: { + if (bound_value.size() < sizeof(int32_t)) { + throw std::runtime_error("Invalid bound size for INTEGER type"); + } + int32_t val; + std::memcpy(&val, bound_value.data(), sizeof(int32_t)); + deserialized_value = Value::INTEGER(val); + break; + } + case LogicalTypeId::BIGINT: { + if (bound_value.size() < sizeof(int64_t)) { + throw std::runtime_error("Invalid bound size for BIGINT type"); + } + int64_t val; + std::memcpy(&val, bound_value.data(), sizeof(int64_t)); + deserialized_value = Value::BIGINT(val); + break; + } + case LogicalTypeId::DATE: { + if (bound_value.size() < sizeof(int32_t)) { // Dates are typically stored as int32 (days since epoch) + throw std::runtime_error("Invalid bound size for DATE type"); + } + int32_t days_since_epoch; + std::memcpy(&days_since_epoch, bound_value.data(), sizeof(int32_t)); + // Convert to DuckDB date + date_t date = Date::EpochDaysToDate(days_since_epoch); + deserialized_value = Value::DATE(date); + break; + } + case LogicalTypeId::TIMESTAMP: { + if (bound_value.size() < + sizeof(int64_t)) { // Timestamps are typically stored as int64 (microseconds since epoch) + throw std::runtime_error("Invalid bound size for TIMESTAMP type"); + } + int64_t micros_since_epoch; + std::memcpy(µs_since_epoch, bound_value.data(), sizeof(int64_t)); + // Convert to DuckDB timestamp using microseconds + timestamp_t timestamp = Timestamp::FromEpochMicroSeconds(micros_since_epoch); + deserialized_value = Value::TIMESTAMP(timestamp); + break; + } + case LogicalTypeId::TIMESTAMP_TZ: { // Added support for TIMESTAMP WITH TIME ZONE + if (bound_value.size() < sizeof(int64_t)) { // Assuming stored as int64 (microseconds since epoch) + throw std::runtime_error("Invalid bound size for TIMESTAMP_TZ type"); + } + int64_t micros_since_epoch; + std::memcpy(µs_since_epoch, bound_value.data(), sizeof(int64_t)); + // Convert to DuckDB timestamp using microseconds + timestamp_t timestamp = Timestamp::FromEpochMicroSeconds(micros_since_epoch); + // Create a TIMESTAMPTZ Value + deserialized_value = Value::TIMESTAMPTZ(timestamp); + break; + } + case LogicalTypeId::DOUBLE: { + if (bound_value.size() < sizeof(double)) { + throw std::runtime_error("Invalid bound size for DOUBLE type"); + } + double val; + std::memcpy(&val, bound_value.data(), sizeof(double)); + deserialized_value = Value::DOUBLE(val); + break; + } + case LogicalTypeId::VARCHAR: { + // Assume the bytes represent a UTF-8 string + std::string str(bound_value.begin(), bound_value.end()); + deserialized_value = Value(str); + break; + } + // Add more types as needed + default: + throw std::runtime_error("Unsupported type for DeserializeBound"); + } + + // Log the final deserialized value + } catch (const std::exception &e) { + std::cout << " Error during deserialization: " << e.what() << std::endl; + // Depending on your error handling strategy, you might want to rethrow or handle it here + throw; + } + + return deserialized_value; +} + +static bool EvaluatePredicateAgainstStatistics(const IcebergManifestEntry &entry, + const vector> &predicates, + const std::vector &schema) { + // Create a mapping from column names to field IDs and their LogicalTypes + std::unordered_map> column_to_field_info; + for (const auto &col_def : schema) { + column_to_field_info[col_def.name] = {col_def.id, col_def.type}; // Assuming col_def.type is LogicalType + } + + for (const auto &predicate : predicates) { + if (auto comparison = dynamic_cast(predicate.get())) { + // Assume predicates are on columns, possibly transformed + std::string column_name; + if (auto colref = dynamic_cast(comparison->left.get())) { + column_name = colref->GetColumnName(); + } else { + // Unsupported predicate structure + continue; + } + + // Retrieve field ID and type + auto it = column_to_field_info.find(column_name); + if (it == column_to_field_info.end()) { + // Column not found in schema, cannot evaluate predicate + continue; + } + int field_id = it->second.first; + LogicalType field_type = it->second.second; + + // Convert field_id to string for lookup + std::string field_id_str = std::to_string(field_id); + + // Get lower and upper bounds + auto lower_it = entry.lower_bounds.find(field_id_str); + auto upper_it = entry.upper_bounds.find(field_id_str); + + if (lower_it == entry.lower_bounds.end() || upper_it == entry.upper_bounds.end()) { + continue; // Cannot filter based on missing bounds + } + + // Deserialize bounds + Value lower_bound, upper_bound; + try { + lower_bound = DeserializeBound(lower_it->second, field_type); + upper_bound = DeserializeBound(upper_it->second, field_type); + } catch (const std::exception &e) { + continue; + } + + // Extract the constant value from the predicate + Value constant_value; + if (auto const_expr = dynamic_cast(comparison->right.get())) { + constant_value = const_expr->value; + } else { + // Unsupported predicate structure + continue; + } + // fprintf(stderr, " Lower bound: %s\n", lower_bound.ToString().c_str()); + // fprintf(stderr, " Upper bound: %s\n", upper_bound.ToString().c_str()); + + // Evaluate the predicate against the bounds + bool result = true; + switch (comparison->type) { + case ExpressionType::COMPARE_EQUAL: + result = (constant_value >= lower_bound && constant_value <= upper_bound); + break; + case ExpressionType::COMPARE_GREATERTHAN: + result = (constant_value <= upper_bound); + break; + case ExpressionType::COMPARE_GREATERTHANOREQUALTO: + result = (constant_value <= upper_bound); + break; + case ExpressionType::COMPARE_LESSTHAN: + result = (constant_value >= lower_bound); + break; + case ExpressionType::COMPARE_LESSTHANOREQUALTO: + result = (constant_value >= lower_bound); + break; + default: + // For other types of comparisons, we can't make a decision based on bounds + result = true; // Conservative approach + break; + } + if (!result) { + return false; // If any predicate fails, exclude the file + } + } + } + return true; // All predicates passed +} + //! Build the Parquet Scan expression for the files we need to scan -static unique_ptr MakeScanExpression(vector &data_file_values, vector &delete_file_values, - vector &schema, bool allow_moved_paths, - string metadata_compression_codec, bool skip_schema_inference, - int64_t data_cardinality, int64_t delete_cardinality) { - - auto cardinality = make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), - make_uniq(Value(data_cardinality))); - - // No deletes, just return a TableFunctionRef for a parquet scan of the data files +static unique_ptr +MakeScanExpression(const string &iceberg_path, FileSystem &fs, vector &data_file_entries, + vector &delete_file_values, vector &schema, bool allow_moved_paths, + string metadata_compression_codec, bool skip_schema_inference, const IcebergTable &iceberg_table, + int64_t data_cardinality, int64_t delete_cardinality, + const IcebergTableFunctionInfo *iceberg_info = nullptr) { + + // Filter data files based on predicates + vector filtered_data_file_values; + if (iceberg_info && !iceberg_info->constraints.empty()) { + for (const auto &entry : data_file_entries) { + if (EvaluatePredicateAgainstStatistics(entry, iceberg_info->constraints, schema)) { + auto full_path = + allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, entry.file_path, fs) : entry.file_path; + filtered_data_file_values.emplace_back(full_path); + } + } + } else { + for (const auto &entry : data_file_entries) { + auto full_path = + allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, entry.file_path, fs) : entry.file_path; + filtered_data_file_values.emplace_back(full_path); + } + } + // fprintf(stderr, "Total number of data files: %zu\n", data_file_entries.size()); + // fprintf(stderr, "Total number of filtered data files: %zu\n", filtered_data_file_values.size()); + // fprintf(stderr, "Total number of delete files: %zu\n", delete_file_values.size()); + + auto cardinality = make_uniq(ExpressionType::COMPARE_EQUAL, + make_uniq("explicit_cardinality"), + make_uniq(Value(data_cardinality))); + + // Handle the scenario with no data files + if (filtered_data_file_values.empty()) { + // **BEGIN: Handling Empty Filtered Data Files** + auto select_node = make_uniq(); + select_node->where_clause = make_uniq(Value::BOOLEAN(false)); + + // Add select expressions for each column based on the schema + for (const auto &col : schema) { + // Create a NULL constant of the appropriate type + auto null_expr = make_uniq(Value(col.type)); + // Alias it to the column name + null_expr->alias = col.name; + select_node->select_list.emplace_back(std::move(null_expr)); + } + + // **Add the FROM clause as EmptyTableRef** + select_node->from_table = make_uniq(); + + // Create a SelectStatement + auto select_statement = make_uniq(); + select_statement->node = std::move(select_node); + + // Create a SubqueryRef with the SelectStatement + auto table_ref_empty = make_uniq(std::move(select_statement), "empty_scan"); + + return std::move(table_ref_empty); + // **END: Handling Empty Filtered Data Files** + } + + // Handle the scenario with no delete files if (delete_file_values.empty()) { auto table_function_ref_data = make_uniq(); table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; - left_children.push_back(make_uniq(Value::LIST(data_file_values))); + left_children.emplace_back(make_uniq(Value::LIST(filtered_data_file_values))); left_children.push_back(std::move(cardinality)); - if (!skip_schema_inference) { - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); + + // Add cardinality condition if available + int64_t data_cardinality = 0; + for (const auto &entry : data_file_entries) { + if (entry.status != IcebergManifestEntryStatusType::DELETED && + entry.content == IcebergManifestEntryContentType::DATA) { + data_cardinality += entry.record_count; + } } + left_children.emplace_back(make_uniq( + ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), + make_uniq(Value(data_cardinality)))); + if (!skip_schema_inference) { + left_children.emplace_back( + make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), + make_uniq(GetParquetSchemaParam(schema)))); + } table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); return std::move(table_function_ref_data); } @@ -172,7 +453,7 @@ static unique_ptr MakeScanExpression(vector &data_file_values, auto table_function_ref_data = make_uniq(); table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; - left_children.push_back(make_uniq(Value::LIST(data_file_values))); + left_children.push_back(make_uniq(Value::LIST(filtered_data_file_values))); left_children.push_back(std::move(cardinality)); left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("filename"), @@ -181,9 +462,9 @@ static unique_ptr MakeScanExpression(vector &data_file_values, make_uniq("file_row_number"), make_uniq(Value(1)))); if (!skip_schema_inference) { - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); + left_children.emplace_back( + make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), + make_uniq(GetParquetSchemaParam(schema)))); } table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); join_node->left = std::move(table_function_ref_data); @@ -192,9 +473,10 @@ static unique_ptr MakeScanExpression(vector &data_file_values, auto table_function_ref_deletes = make_uniq(); table_function_ref_deletes->alias = "iceberg_scan_deletes"; vector> right_children; - right_children.push_back(make_uniq(Value::LIST(delete_file_values))); - right_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), - make_uniq(Value(delete_cardinality)))); + right_children.emplace_back(make_uniq(Value::LIST(delete_file_values))); + right_children.emplace_back(make_uniq( + ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), + make_uniq(Value(delete_cardinality)))); table_function_ref_deletes->function = make_uniq("parquet_scan", std::move(right_children)); join_node->right = std::move(table_function_ref_deletes); @@ -207,10 +489,11 @@ static unique_ptr MakeScanExpression(vector &data_file_values, auto select_expr = make_uniq(); select_expr->exclude_list = {"filename", "file_row_number"}; vector> select_exprs; - select_exprs.push_back(std::move(select_expr)); + select_exprs.emplace_back(std::move(select_expr)); select_node->select_list = std::move(select_exprs); select_statement->node = std::move(select_node); + // fprintf(stderr, "Final SQL statement:\n%s\n", select_statement->ToString().c_str()); return make_uniq(std::move(select_statement), "iceberg_scan"); } @@ -219,7 +502,7 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table auto iceberg_path = input.inputs[0].ToString(); // Enabling this will ensure the ANTI Join with the deletes only looks at filenames, instead of full paths - // this allows hive tables to be moved and have mismatching paths, usefull for testing, but will have worse + // this allows hive tables to be moved and have mismatching paths, useful for testing, but will have worse // performance bool allow_moved_paths = false; bool skip_schema_inference = false; @@ -248,41 +531,73 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table version_name_format = StringValue::Get(kv.second); } } - auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format); + auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, + table_version, version_name_format); + IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = + IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), + metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), + metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, + skip_schema_inference); } - IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec); - auto data_files = iceberg_table.GetPaths(); + IcebergTable iceberg_table = + IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec); + + auto data_entries = iceberg_table.GetEntries(); auto delete_files = iceberg_table.GetPaths(); vector data_file_values; - for (auto &data_file : data_files) { - data_file_values.push_back( - {allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, data_file, fs) : data_file}); - } + vector delete_file_values; for (auto &delete_file : delete_files) { - delete_file_values.push_back( - {allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, delete_file, fs) : delete_file}); + auto full_path = allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, delete_file, fs) : delete_file; + delete_file_values.emplace_back(full_path); } + // === Extract predicates from input.binder === + vector> extracted_predicates; + if (input.binder) { + + // Access the where_clause from the binder + auto statement = input.binder->GetRootStatement(); + if (statement && statement->type == StatementType::SELECT_STATEMENT) { + auto &select_statement = (SelectStatement &)*statement; + if (select_statement.node->type == QueryNodeType::SELECT_NODE) { + auto &select_node = (SelectNode &)*select_statement.node; + if (select_node.where_clause) { + ExtractPredicates(*select_node.where_clause, extracted_predicates); + } + } + } + } + + // Create IcebergTableFunctionInfo with extracted predicates + auto iceberg_info = make_uniq(std::move(extracted_predicates)); + input.info = iceberg_info.release(); // Assign raw pointer + + // Handle 'mode' and integrate predicate pushdown if (mode == "list_files") { + for (const auto &entry : data_entries) { + auto full_path = + allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, entry.file_path, fs) : entry.file_path; + data_file_values.emplace_back(full_path); + } return MakeListFilesExpression(data_file_values, delete_file_values); } else if (mode == "default") { int64_t data_cardinality = 0, delete_cardinality = 0; - for(auto &manifest : iceberg_table.entries) { - for(auto &entry : manifest.manifest_entries) { + for (auto &manifest : iceberg_table.entries) { + for (auto &entry : manifest.manifest_entries) { if (entry.status != IcebergManifestEntryStatusType::DELETED) { if (entry.content == IcebergManifestEntryContentType::DATA) { data_cardinality += entry.record_count; @@ -292,7 +607,15 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table } } } - return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality); + + IcebergTableFunctionInfo *iceberg_info_cast = dynamic_cast(input.info.get()); + if (!iceberg_info_cast) { + throw std::bad_cast(); // Handle the error appropriately + } + + return MakeScanExpression(iceberg_path, fs, data_entries, delete_file_values, snapshot_to_scan.schema, + allow_moved_paths, metadata_compression_codec, skip_schema_inference, iceberg_table, + data_cardinality, delete_cardinality, iceberg_info_cast); } else { throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'"); } @@ -336,4 +659,4 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { return function_set; } -} // namespace duckdb +} // namespace duckdb \ No newline at end of file diff --git a/src/iceberg_functions/iceberg_utils.cpp b/src/iceberg_functions/iceberg_utils.cpp new file mode 100644 index 0000000..eb41c6f --- /dev/null +++ b/src/iceberg_functions/iceberg_utils.cpp @@ -0,0 +1,6 @@ +string IcebergUtils::GetFullPath(const string &base_path, const string &relative_path, FileSystem &fs) { + if (fs.IsAbsolutePath(relative_path)) { + return relative_path; + } + return fs.JoinPath(base_path, relative_path); +} \ No newline at end of file diff --git a/src/include/avro_codegen/iceberg_manifest_entry_partial.hpp b/src/include/avro_codegen/iceberg_manifest_entry_partial.hpp index e38d4b8..a8a4557 100644 --- a/src/include/avro_codegen/iceberg_manifest_entry_partial.hpp +++ b/src/include/avro_codegen/iceberg_manifest_entry_partial.hpp @@ -1,124 +1,380 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* https://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - - -#ifndef CPX2_HH_2561633724__H_ -#define CPX2_HH_2561633724__H_ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* This code was generated by avrogencpp 1.13.0-SNAPSHOT. Do not edit.*/ + +#ifndef MANIFEST_ENTRY_HH_2043678367_H +#define MANIFEST_ENTRY_HH_2043678367_H -#include -#include "boost/any.hpp" -#include "avro/Specific.hh" -#include "avro/Encoder.hh" #include "avro/Decoder.hh" +#include "avro/Encoder.hh" +#include "avro/Specific.hh" +#include "boost/any.hpp" + +#include +#include -namespace c { -struct data_file { - int32_t content; - std::string file_path; - std::string file_format; - int64_t record_count; - data_file() : - content(int32_t()), - file_path(std::string()), - file_format(std::string()), - record_count(int64_t()) - { } +struct k126_v127 { + int32_t key; + std::vector value; + k126_v127() : key(int32_t()), value(std::vector()) { + } +}; + +struct manifest_entry_json_Union__0__ { +private: + size_t idx_; + boost::any value_; + +public: + /** enum representing union branches as returned by the idx() function */ + enum class Branch : size_t { + null = 0, + array = 1, + }; + size_t idx() const { + return idx_; + } + Branch branch() const { + return static_cast(idx_); + } + bool is_null() const { + return (idx_ == 0); + } + void set_null() { + idx_ = 0; + value_ = boost::any(); + } + const std::vector &get_array() const; + std::vector &get_array(); + void set_array(const std::vector &v); + void set_array(std::vector &&v); + manifest_entry_json_Union__0__(); +}; + +struct k129_v130 { + int32_t key; + std::vector value; + k129_v130() : key(int32_t()), value(std::vector()) { + } +}; + +struct manifest_entry_json_Union__1__ { +private: + size_t idx_; + boost::any value_; + +public: + /** enum representing union branches as returned by the idx() function */ + enum class Branch : size_t { + null = 0, + array = 1, + }; + size_t idx() const { + return idx_; + } + Branch branch() const { + return static_cast(idx_); + } + bool is_null() const { + return (idx_ == 0); + } + void set_null() { + idx_ = 0; + value_ = boost::any(); + } + const std::vector &get_array() const; + std::vector &get_array(); + void set_array(const std::vector &v); + void set_array(std::vector &&v); + manifest_entry_json_Union__1__(); +}; + +struct r2 { + typedef manifest_entry_json_Union__0__ lower_bounds_t; + typedef manifest_entry_json_Union__1__ upper_bounds_t; + int32_t content; + std::string file_path; + std::string file_format; + int64_t record_count; + lower_bounds_t lower_bounds; + upper_bounds_t upper_bounds; + r2() + : content(int32_t()), file_path(std::string()), file_format(std::string()), record_count(int64_t()), + lower_bounds(lower_bounds_t()), upper_bounds(upper_bounds_t()) { + } }; struct manifest_entry { - int32_t status; - data_file data_file_; // NOTE: as generated, this is called data_file, but this causes issues with GCC - manifest_entry() : - status(int32_t()), - data_file_() - { } + int32_t status; + r2 data_file; + manifest_entry() : status(int32_t()), data_file(r2()) { + } }; +inline const std::vector &manifest_entry_json_Union__0__::get_array() const { + if (idx_ != 1) { + throw avro::Exception("Invalid type for union manifest_entry_json_Union__0__"); + } + return *boost::any_cast>(&value_); +} + +inline std::vector &manifest_entry_json_Union__0__::get_array() { + if (idx_ != 1) { + throw avro::Exception("Invalid type for union manifest_entry_json_Union__0__"); + } + return *boost::any_cast>(&value_); +} + +inline void manifest_entry_json_Union__0__::set_array(const std::vector &v) { + idx_ = 1; + value_ = v; +} + +inline void manifest_entry_json_Union__0__::set_array(std::vector &&v) { + idx_ = 1; + value_ = std::move(v); +} + +inline const std::vector &manifest_entry_json_Union__1__::get_array() const { + if (idx_ != 1) { + throw avro::Exception("Invalid type for union manifest_entry_json_Union__1__"); + } + return *boost::any_cast>(&value_); +} + +inline std::vector &manifest_entry_json_Union__1__::get_array() { + if (idx_ != 1) { + throw avro::Exception("Invalid type for union manifest_entry_json_Union__1__"); + } + return *boost::any_cast>(&value_); +} + +inline void manifest_entry_json_Union__1__::set_array(const std::vector &v) { + idx_ = 1; + value_ = v; +} + +inline void manifest_entry_json_Union__1__::set_array(std::vector &&v) { + idx_ = 1; + value_ = std::move(v); +} + +inline manifest_entry_json_Union__0__::manifest_entry_json_Union__0__() : idx_(0) { +} +inline manifest_entry_json_Union__1__::manifest_entry_json_Union__1__() : idx_(0) { } namespace avro { -template<> struct codec_traits { - static void encode(Encoder& e, const c::data_file& v) { - avro::encode(e, v.content); - avro::encode(e, v.file_path); - avro::encode(e, v.file_format); - avro::encode(e, v.record_count); - } - static void decode(Decoder& d, c::data_file& v) { - if (avro::ResolvingDecoder *rd = - dynamic_cast(&d)) { - const std::vector fo = rd->fieldOrder(); - for (std::vector::const_iterator it = fo.begin(); - it != fo.end(); ++it) { - switch (*it) { - case 0: - avro::decode(d, v.content); - break; - case 1: - avro::decode(d, v.file_path); - break; - case 2: - avro::decode(d, v.file_format); - break; - case 3: - avro::decode(d, v.record_count); - break; - default: - break; - } - } - } else { - avro::decode(d, v.content); - avro::decode(d, v.file_path); - avro::decode(d, v.file_format); - avro::decode(d, v.record_count); - } - } +template <> +struct codec_traits { + static void encode(Encoder &e, const k126_v127 &v) { + avro::encode(e, v.key); + avro::encode(e, v.value); + } + static void decode(Decoder &d, k126_v127 &v) { + if (avro::ResolvingDecoder *rd = dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.key); + break; + case 1: + avro::decode(d, v.value); + break; + default: + break; + } + } + } else { + avro::decode(d, v.key); + avro::decode(d, v.value); + } + } }; -template<> struct codec_traits { - static void encode(Encoder& e, const c::manifest_entry& v) { - avro::encode(e, v.status); - avro::encode(e, v.data_file_); - } - static void decode(Decoder& d, c::manifest_entry& v) { - if (avro::ResolvingDecoder *rd = - dynamic_cast(&d)) { - const std::vector fo = rd->fieldOrder(); - for (std::vector::const_iterator it = fo.begin(); - it != fo.end(); ++it) { - switch (*it) { - case 0: - avro::decode(d, v.status); - break; - case 1: - avro::decode(d, v.data_file_); - break; - default: - break; - } - } - } else { - avro::decode(d, v.status); - avro::decode(d, v.data_file_); - } - } +template <> +struct codec_traits { + static void encode(Encoder &e, manifest_entry_json_Union__0__ v) { + e.encodeUnionIndex(v.idx()); + switch (v.idx()) { + case 0: + e.encodeNull(); + break; + case 1: + avro::encode(e, v.get_array()); + break; + } + } + static void decode(Decoder &d, manifest_entry_json_Union__0__ &v) { + size_t n = d.decodeUnionIndex(); + if (n >= 2) { + throw avro::Exception("Union index too big"); + } + switch (n) { + case 0: + d.decodeNull(); + v.set_null(); + break; + case 1: { + std::vector vv; + avro::decode(d, vv); + v.set_array(std::move(vv)); + } break; + } + } }; -} -#endif +template <> +struct codec_traits { + static void encode(Encoder &e, const k129_v130 &v) { + avro::encode(e, v.key); + avro::encode(e, v.value); + } + static void decode(Decoder &d, k129_v130 &v) { + if (avro::ResolvingDecoder *rd = dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.key); + break; + case 1: + avro::decode(d, v.value); + break; + default: + break; + } + } + } else { + avro::decode(d, v.key); + avro::decode(d, v.value); + } + } +}; + +template <> +struct codec_traits { + static void encode(Encoder &e, manifest_entry_json_Union__1__ v) { + e.encodeUnionIndex(v.idx()); + switch (v.idx()) { + case 0: + e.encodeNull(); + break; + case 1: + avro::encode(e, v.get_array()); + break; + } + } + static void decode(Decoder &d, manifest_entry_json_Union__1__ &v) { + size_t n = d.decodeUnionIndex(); + if (n >= 2) { + throw avro::Exception("Union index too big"); + } + switch (n) { + case 0: + d.decodeNull(); + v.set_null(); + break; + case 1: { + std::vector vv; + avro::decode(d, vv); + v.set_array(std::move(vv)); + } break; + } + } +}; + +template <> +struct codec_traits { + static void encode(Encoder &e, const r2 &v) { + avro::encode(e, v.content); + avro::encode(e, v.file_path); + avro::encode(e, v.file_format); + avro::encode(e, v.record_count); + avro::encode(e, v.lower_bounds); + avro::encode(e, v.upper_bounds); + } + static void decode(Decoder &d, r2 &v) { + if (avro::ResolvingDecoder *rd = dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.content); + break; + case 1: + avro::decode(d, v.file_path); + break; + case 2: + avro::decode(d, v.file_format); + break; + case 3: + avro::decode(d, v.record_count); + break; + case 4: + avro::decode(d, v.lower_bounds); + break; + case 5: + avro::decode(d, v.upper_bounds); + break; + default: + break; + } + } + } else { + avro::decode(d, v.content); + avro::decode(d, v.file_path); + avro::decode(d, v.file_format); + avro::decode(d, v.record_count); + avro::decode(d, v.lower_bounds); + avro::decode(d, v.upper_bounds); + } + } +}; + +template <> +struct codec_traits { + static void encode(Encoder &e, const manifest_entry &v) { + avro::encode(e, v.status); + avro::encode(e, v.data_file); + } + static void decode(Decoder &d, manifest_entry &v) { + if (avro::ResolvingDecoder *rd = dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.status); + break; + case 1: + avro::decode(d, v.data_file); + break; + default: + break; + } + } + } else { + avro::decode(d, v.status); + avro::decode(d, v.data_file); + } + } +}; + +} // namespace avro +#endif \ No newline at end of file diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index d7d4478..cbfae6a 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -113,6 +113,24 @@ struct IcebergTable { return ret; } + //! Returns all IcebergManifestEntry objects to be scanned for the given IcebergManifestContentType + template + vector GetEntries() { + vector ret; + for (auto &entry : entries) { + if (entry.manifest.content != TYPE) { + continue; + } + for (auto &manifest_entry : entry.manifest_entries) { + if (manifest_entry.status == IcebergManifestEntryStatusType::DELETED) { + continue; + } + ret.push_back(manifest_entry); + } + } + return ret; + } + void Print() { Printer::Print("Iceberg table (" + path + ")"); for (auto &entry : entries) { diff --git a/src/include/iceberg_types.hpp b/src/include/iceberg_types.hpp index 16ec1ca..f6942c4 100644 --- a/src/include/iceberg_types.hpp +++ b/src/include/iceberg_types.hpp @@ -13,6 +13,8 @@ #include "avro_codegen/iceberg_manifest_file_partial.hpp" #include "avro_codegen/iceberg_manifest_file_partial_v1.hpp" +#include // Add this line for std::setfill and std::setw + namespace duckdb { enum class IcebergManifestContentType : uint8_t { @@ -69,12 +71,12 @@ static string MANIFEST_SCHEMA = "{\n" // Schema for v1, sequence_number and content are not present there static string MANIFEST_SCHEMA_V1 = "{\n" - " \"type\": \"record\",\n" - " \"name\": \"manifest_file\",\n" - " \"fields\" : [\n" - " {\"name\": \"manifest_path\", \"type\": \"string\"}\n" - " ]\n" - " }"; + " \"type\": \"record\",\n" + " \"name\": \"manifest_file\",\n" + " \"fields\" : [\n" + " {\"name\": \"manifest_path\", \"type\": \"string\"}\n" + " ]\n" + " }"; //! An entry in the manifest list file (top level AVRO file) struct IcebergManifest { @@ -119,50 +121,141 @@ struct IcebergManifest { //! The schema containing the fields from the manifest entry. //! this schema should match the generated cpp header from src/include/avro_codegen/iceberg_manifest_entry_partial.hpp -static string MANIFEST_ENTRY_SCHEMA = "{\n" - " \"type\": \"record\",\n" - " \"name\": \"manifest_entry\",\n" - " \"fields\" : [\n" - " {\"name\": \"status\", \"type\" : \"int\"},\n" - " {\"name\": \"data_file\", \"type\": {\n" - " \"type\": \"record\",\n" - " \"name\": \"r2\",\n" - " \"fields\" : [\n" - " {\"name\": \"content\", \"type\": \"int\"},\n" - " {\"name\": \"file_path\", \"type\": \"string\"},\n" - " {\"name\": \"file_format\", \"type\": \"string\"},\n" - " {\"name\": \"record_count\", \"type\" : \"long\"}\n" - " ]}\n" - " }\n" - " ]\n" - " }"; +// static string MANIFEST_ENTRY_SCHEMA = "{\n" +// " \"type\": \"record\",\n" +// " \"name\": \"manifest_entry\",\n" +// " \"fields\" : [\n" +// " {\"name\": \"status\", \"type\" : \"int\"},\n" +// " {\"name\": \"data_file\", \"type\": {\n" +// " \"type\": \"record\",\n" +// " \"name\": \"r2\",\n" +// " \"fields\" : [\n" +// " {\"name\": \"content\", \"type\": \"int\"},\n" +// " {\"name\": \"file_path\", \"type\": \"string\"},\n" +// " {\"name\": \"file_format\", \"type\": \"string\"},\n" +// " {\"name\": \"record_count\", \"type\" : \"long\"},\n" +// " {\"name\": \"lower_bounds\", \"type\": [\"null\", {\n" +// " \"type\": \"array\",\n" +// " \"items\": {\n" +// " \"type\": \"record\",\n" +// " \"name\": \"k126_v127\",\n" +// " \"fields\": [\n" +// " {\"name\": \"key\", \"type\": \"int\"},\n" +// " {\"name\": \"value\", \"type\": \"bytes\"}\n" +// " ]\n" +// " }\n" +// " }], \"default\": null},\n" +// " {\"name\": \"upper_bounds\", \"type\": [\"null\", {\n" +// " \"type\": \"array\",\n" +// " \"items\": {\n" +// " \"type\": \"record\",\n" +// " \"name\": \"k129_v130\",\n" +// " \"fields\": [\n" +// " {\"name\": \"key\", \"type\": \"int\"},\n" +// " {\"name\": \"value\", \"type\": \"bytes\"}\n" +// " ]\n" +// " }\n" +// " }], \"default\": null}\n" +// " ]}\n" +// " }\n" +// " ]\n" +// " }"; -static string MANIFEST_ENTRY_SCHEMA_V1 = "{\n" - " \"type\": \"record\",\n" - " \"name\": \"manifest_entry\",\n" - " \"fields\" : [\n" - " {\"name\": \"status\", \"type\" : \"int\"},\n" - " {\"name\": \"data_file\", \"type\": {\n" - " \"type\": \"record\",\n" - " \"name\": \"r2\",\n" - " \"fields\" : [\n" - " {\"name\": \"file_path\", \"type\": \"string\"},\n" - " {\"name\": \"file_format\", \"type\": \"string\"},\n" - " {\"name\": \"record_count\", \"type\" : \"long\"}\n" - " ]}\n" - " }\n" - " ]\n" - " }"; +static string MANIFEST_ENTRY_SCHEMA = R"( +{ + "type": "record", + "name": "manifest_entry", + "fields": [ + { "name": "status", "type": "int", "field-id": 0 }, + { "name": "data_file", "type": { + "type": "record", + "name": "r2", + "fields": [ + { "name": "content", "type": "int", "field-id": 134 }, + { "name": "file_path", "type": "string", "field-id": 100 }, + { "name": "file_format", "type": "string", "field-id": 101 }, + { "name": "record_count", "type": "long", "field-id": 103 }, + { "name": "lower_bounds", "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k126_v127", + "fields": [ + { "name": "key", "type": "int", "field-id": 126 }, + { "name": "value", "type": "bytes", "field-id": 127 } + ] + } + } + ], + "default": null, + "field-id": 125 + }, + { "name": "upper_bounds", "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k129_v130", + "fields": [ + { "name": "key", "type": "int", "field-id": 129 }, + { "name": "value", "type": "bytes", "field-id": 130 } + ] + } + } + ], "default": null, "field-id": 128 } + ] + }, "field-id": 2 } + ] +} +)"; +static string MANIFEST_ENTRY_SCHEMA_V1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"manifest_entry\",\n" + " \"fields\" : [\n" + " {\"name\": \"status\", \"type\" : \"int\"},\n" + " {\"name\": \"data_file\", \"type\": {\n" + " \"type\": \"record\",\n" + " \"name\": \"r2\",\n" + " \"fields\" : [\n" + " {\"name\": \"file_path\", \"type\": \"string\"},\n" + " {\"name\": \"file_format\", \"type\": \"string\"},\n" + " {\"name\": \"record_count\", \"type\" : \"long\"}\n" + " ]}\n" + " }\n" + " ]\n" + " }"; //! An entry in a manifest file struct IcebergManifestEntry { - explicit IcebergManifestEntry(const c::manifest_entry &schema) { + explicit IcebergManifestEntry(const manifest_entry &schema) { status = (IcebergManifestEntryStatusType)schema.status; - content = (IcebergManifestEntryContentType)schema.data_file_.content; - file_path = schema.data_file_.file_path; - file_format = schema.data_file_.file_format; - record_count = schema.data_file_.record_count; + content = (IcebergManifestEntryContentType)schema.data_file.content; + const auto &data_file = schema.data_file; + file_path = data_file.file_path; + file_format = data_file.file_format; + record_count = data_file.record_count; + lower_bounds.clear(); + upper_bounds.clear(); + + // Handle lower_bounds + if (data_file.lower_bounds.idx() == static_cast(manifest_entry_json_Union__0__::Branch::array)) { + const auto &bounds_array = data_file.lower_bounds.get_array(); + for (const auto &lb : bounds_array) { + lower_bounds[std::to_string(lb.key)] = lb.value; + } + } + + // Handle upper_bounds + if (data_file.upper_bounds.idx() == static_cast(manifest_entry_json_Union__1__::Branch::array)) { + const auto &bounds_array = data_file.upper_bounds.get_array(); + for (const auto &ub : bounds_array) { + upper_bounds[std::to_string(ub.key)] = ub.value; + } + } } explicit IcebergManifestEntry(const c::manifest_entry_v1 &schema) { @@ -171,6 +264,10 @@ struct IcebergManifestEntry { file_path = schema.data_file_.file_path; file_format = schema.data_file_.file_format; record_count = schema.data_file_.record_count; + + // Initialize bounds as empty maps + lower_bounds.clear(); + upper_bounds.clear(); } IcebergManifestEntryStatusType status; @@ -181,6 +278,10 @@ struct IcebergManifestEntry { string file_format; int64_t record_count; + // Add new members for bounds + std::unordered_map> lower_bounds; + std::unordered_map> upper_bounds; + void Print() { Printer::Print(" -> ManifestEntry = { type: " + IcebergManifestEntryStatusTypeToString(status) + ", content: " + IcebergManifestEntryContentTypeToString(content) + ", file: " + file_path + @@ -209,4 +310,4 @@ struct IcebergTableEntry { } } }; -} // namespace duckdb +} // namespace duckdb \ No newline at end of file diff --git a/test/sql/iceberg_scan.test b/test/sql/iceberg_scan.test index 4f84bb7..56c6168 100644 --- a/test/sql/iceberg_scan.test +++ b/test/sql/iceberg_scan.test @@ -79,3 +79,27 @@ query I SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version='2', version_name_format='v%s%s.metadata.json'); ---- 111968 + +### Pushdown l_shipdate with results +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE) WHERE l_shipdate >= '1996-01-01'; +---- +21674 + +### Pushdown l_shipdate with no results +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE) WHERE l_shipdate >= '2222-01-01'; +---- +0 + +### Pushdown l_shipmode with results +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE) WHERE l_shipmode = 'AIR'; +---- +7265 + +### Pushdown l_shipmode with no results +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE) WHERE l_shipmode = 'FOOBAR'; +---- +0