Skip to content

Commit

Permalink
add predicate pushdown
Browse files Browse the repository at this point in the history
add predicate pushdown

formatting

fixed filename issue

fixed filename issue

updated formatting

updated formatting

added pred push tests

added pred push tests2

fixed formatting, removed /data

removed /data

Force remove tracked files from data/iceberg

Remove tracked files from data/iceberg
  • Loading branch information
mike-luabase committed Oct 25, 2024
1 parent 0de4979 commit 136b5eb
Show file tree
Hide file tree
Showing 10 changed files with 985 additions and 231 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ test/python/__pycache__/
.Rhistory
test/sql/tmp.test
data/iceberg/generated_*
data/iceberg/generated_*/
data/iceberg/
scripts/metastore_db/
scripts/derby.log
scripts/test-script-with-path.sql
6 changes: 4 additions & 2 deletions scripts/test_data_generator/generate_base_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz,
l_comment as l_comment_string,
gen_random_uuid()::VARCHAR as uuid,
l_comment::BLOB as l_comment_blob
l_comment::BLOB as l_comment_blob,
l_shipmode as l_shipmode_string
FROM
lineitem;""");
elif (MODE.lower() == "default"):
Expand All @@ -67,7 +68,8 @@
l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz,
l_comment as l_comment_string,
gen_random_uuid()::UUID as uuid,
l_comment::BLOB as l_comment_blob
l_comment::BLOB as l_comment_blob,
l_shipmode as l_shipmode_string
FROM
lineitem;""");
else:
Expand Down
46 changes: 34 additions & 12 deletions scripts/test_data_generator/generate_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python3
#!/Users/mritchie712/opt/anaconda3/bin/python
import pyspark
import pyspark.sql
import sys
Expand All @@ -8,15 +8,15 @@
from pathlib import Path

if (len(sys.argv) != 4 ):
print("Usage: generate_iceberg.py <SCALE_FACTOR> <DEST_PATH> <ICBERG_SPEC_VERSION>")
print("Usage: generate_iceberg.py <SCALE_FACTOR> <DEST_PATH> <ICEBERG_SPEC_VERSION>")
exit(1)

SCALE = sys.argv[1]
DEST_PATH = sys.argv[2]
ICEBERG_SPEC_VERSION = sys.argv[3]

PARQUET_SRC_FILE = f'{DEST_PATH}/base_file/file.parquet'
TABLE_NAME = "iceberg_catalog.pyspark_iceberg_table";
TABLE_NAME = "iceberg_catalog.pyspark_iceberg_table"
CWD = os.getcwd()
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))

Expand All @@ -26,7 +26,7 @@
os.system(f"python3 {SCRIPT_DIR}/generate_base_parquet.py {SCALE} {CWD}/{DEST_PATH} spark")

###
### Configure everyone's favorite apache product
### Configure Spark with Iceberg
###
conf = pyspark.SparkConf()
conf.setMaster('local[*]')
Expand All @@ -42,23 +42,46 @@
sc.setLogLevel("ERROR")

###
### Create Iceberg table from dataset
### Create Iceberg table from dataset with partitioning
###
spark.read.parquet(PARQUET_SRC_FILE).createOrReplaceTempView('parquet_file_view');
spark.read.parquet(PARQUET_SRC_FILE).createOrReplaceTempView('parquet_file_view')

# Define your partition columns and transforms
partition_spec = "year(l_shipdate_date), l_shipmode_string" # Adjust 'l_shipdate_date' as needed

if ICEBERG_SPEC_VERSION == '1':
spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}') AS SELECT * FROM parquet_file_view");
create_table_sql = f"""
CREATE OR REPLACE TABLE {TABLE_NAME}
USING iceberg
PARTITIONED BY ({partition_spec})
TBLPROPERTIES (
'format-version' = '{ICEBERG_SPEC_VERSION}'
)
AS SELECT * FROM parquet_file_view
"""
elif ICEBERG_SPEC_VERSION == '2':
spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}', 'write.update.mode'='merge-on-read') AS SELECT * FROM parquet_file_view");
create_table_sql = f"""
CREATE OR REPLACE TABLE {TABLE_NAME}
USING iceberg
PARTITIONED BY ({partition_spec})
TBLPROPERTIES (
'format-version' = '{ICEBERG_SPEC_VERSION}',
'write.update.mode' = 'merge-on-read'
)
AS SELECT * FROM parquet_file_view
"""
else:
print(f"Are you from the future? Iceberg spec version '{ICEBERG_SPEC_VERSION}' is unbeknownst to me")
exit(1)

# Execute the CREATE TABLE statement
spark.sql(create_table_sql)

###
### Apply modifications to base table generating verification results between each step
###
update_files = [str(path) for path in Path(f'{SCRIPT_DIR}/updates_v{ICEBERG_SPEC_VERSION}').rglob('*.sql')]
update_files.sort() # Order matters obviously
update_files.sort() # Order matters obviously
last_file = ""

for path in update_files:
Expand All @@ -82,17 +105,16 @@

# Create copy of table
df = spark.read.table(TABLE_NAME)
df.write.parquet(f"{DEST_PATH}/expected_results/{file_trimmed}/data");
df.write.parquet(f"{DEST_PATH}/expected_results/{file_trimmed}/data")

# For documentation, also write the query we executed to the data
query_path = f'{DEST_PATH}/expected_results/{file_trimmed}/query.sql'
with open(query_path, 'w') as f:
f.write("-- The query executed at this step:\n")
f.write(query)


###
### Finally, we copy the latest results to a "final" dir for easy test writing
###
import shutil
shutil.copytree(f"{DEST_PATH}/expected_results/{last_file}", f"{DEST_PATH}/expected_results/last")
shutil.copytree(f"{DEST_PATH}/expected_results/{last_file}", f"{DEST_PATH}/expected_results/last")
4 changes: 2 additions & 2 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(const string &pat
}
} else {
auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA);
avro::DataFileReader<c::manifest_entry> dfr(std::move(stream), schema);
c::manifest_entry manifest_entry;
avro::DataFileReader<manifest_entry> dfr(std::move(stream), schema);
manifest_entry manifest_entry;
while (dfr.read(manifest_entry)) {
ret.emplace_back(IcebergManifestEntry(manifest_entry));
}
Expand Down
Loading

0 comments on commit 136b5eb

Please sign in to comment.