From d71c436ae2006843dc720bfdfcb8b3aeb434815e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 13 May 2024 10:00:18 -0400 Subject: [PATCH] Add examples from TPC-H (#666) * Update location of docker image * Initial commit for queries 1-3 * Commit queries 4-7 of TPC-H in examples * Add required license text * Add additional text around why to use a case statement in the example * add market share example * Add example for product type profit measure * Inital commit returned item report * Linting * Initial commit of q11 example * Initial commit of q12 from tpc-h * Initial commit for customer distribution example * Initial commit of promotion effect example * Initial commit of q15 in tph-c, top supplier * Initial commit of q16 in tph-c, part supplier relationship * Initial commit of q17 in tph-c, small quatity order * Initial commit of q18 in tph-c, large volume customer * Initial commit of q19 in tph-c, discounted revenue * Initial commit of q20 in tph-c, potential part promotion * Initial commit of q21 in tph-c, supplier who kept order waiting * Initial commit of q22 in tph-c, global sales opportunity * Adding readme information and marking text as copyrighted * Minimum part cost must be identified per part not across all parts that match the filters * Change ordering of output rows to match spec * Set parameter to match spec * Set parameter to match spec * setting values to match spec * Linting * Expand on readme to link to examples within tpch folder * Minor typo --- benchmarks/tpch/tpch-gen.sh | 6 +- examples/README.md | 64 +++++++ examples/tpch/.gitignore | 2 + examples/tpch/README.md | 57 ++++++ examples/tpch/convert_data_to_parquet.py | 142 ++++++++++++++ examples/tpch/q01_pricing_summary_report.py | 90 +++++++++ examples/tpch/q02_minimum_cost_supplier.py | 139 ++++++++++++++ examples/tpch/q03_shipping_priority.py | 86 +++++++++ examples/tpch/q04_order_priority_checking.py | 80 ++++++++ examples/tpch/q05_local_supplier_volume.py | 102 ++++++++++ .../tpch/q06_forecasting_revenue_change.py | 87 +++++++++ examples/tpch/q07_volume_shipping.py | 123 ++++++++++++ examples/tpch/q08_market_share.py | 175 ++++++++++++++++++ .../tpch/q09_product_type_profit_measure.py | 93 ++++++++++ examples/tpch/q10_returned_item_reporting.py | 108 +++++++++++ .../q11_important_stock_identification.py | 82 ++++++++ examples/tpch/q12_ship_mode_order_priority.py | 112 +++++++++++ examples/tpch/q13_customer_distribution.py | 64 +++++++ examples/tpch/q14_promotion_effect.py | 81 ++++++++ examples/tpch/q15_top_supplier.py | 87 +++++++++ .../tpch/q16_part_supplier_relationship.py | 85 +++++++++ examples/tpch/q17_small_quantity_order.py | 69 +++++++ examples/tpch/q18_large_volume_customer.py | 65 +++++++ examples/tpch/q19_discounted_revenue.py | 137 ++++++++++++++ examples/tpch/q20_potential_part_promotion.py | 97 ++++++++++ .../tpch/q21_suppliers_kept_orders_waiting.py | 114 ++++++++++++ examples/tpch/q22_global_sales_opportunity.py | 76 ++++++++ 27 files changed, 2420 insertions(+), 3 deletions(-) create mode 100644 examples/tpch/.gitignore create mode 100644 examples/tpch/README.md create mode 100644 examples/tpch/convert_data_to_parquet.py create mode 100644 examples/tpch/q01_pricing_summary_report.py create mode 100644 examples/tpch/q02_minimum_cost_supplier.py create mode 100644 examples/tpch/q03_shipping_priority.py create mode 100644 examples/tpch/q04_order_priority_checking.py create mode 100644 examples/tpch/q05_local_supplier_volume.py create mode 100644 examples/tpch/q06_forecasting_revenue_change.py create mode 100644 examples/tpch/q07_volume_shipping.py create mode 100644 examples/tpch/q08_market_share.py create mode 100644 examples/tpch/q09_product_type_profit_measure.py create mode 100644 examples/tpch/q10_returned_item_reporting.py create mode 100644 examples/tpch/q11_important_stock_identification.py create mode 100644 examples/tpch/q12_ship_mode_order_priority.py create mode 100644 examples/tpch/q13_customer_distribution.py create mode 100644 examples/tpch/q14_promotion_effect.py create mode 100644 examples/tpch/q15_top_supplier.py create mode 100644 examples/tpch/q16_part_supplier_relationship.py create mode 100644 examples/tpch/q17_small_quantity_order.py create mode 100644 examples/tpch/q18_large_volume_customer.py create mode 100644 examples/tpch/q19_discounted_revenue.py create mode 100644 examples/tpch/q20_potential_part_promotion.py create mode 100644 examples/tpch/q21_suppliers_kept_orders_waiting.py create mode 100644 examples/tpch/q22_global_sales_opportunity.py diff --git a/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch/tpch-gen.sh index e27472a3..15cab12a 100755 --- a/benchmarks/tpch/tpch-gen.sh +++ b/benchmarks/tpch/tpch-gen.sh @@ -29,7 +29,7 @@ FILE=./data/supplier.tbl if test -f "$FILE"; then echo "$FILE exists." else - docker run -v `pwd`/data:/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s $1 + docker run -v `pwd`/data:/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s $1 # workaround for https://github.com/apache/arrow-datafusion/issues/6147 mv data/customer.tbl data/customer.csv @@ -49,5 +49,5 @@ FILE=./data/answers/q1.out if test -f "$FILE"; then echo "$FILE exists." else - docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/" -fi \ No newline at end of file + docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/" +fi diff --git a/examples/README.md b/examples/README.md index 82405955..0ef194af 100644 --- a/examples/README.md +++ b/examples/README.md @@ -52,3 +52,67 @@ Here is a direct link to the file used in the examples: - [Executing SQL on Polars](./sql-on-polars.py) - [Executing SQL on Pandas](./sql-on-pandas.py) - [Executing SQL on cuDF](./sql-on-cudf.py) + +## TPC-H Examples + +Within the subdirectory `tpch` there are 22 examples that reproduce queries in +the TPC-H specification. These include realistic data that can be generated at +arbitrary scale and allow the user to see use cases for a variety of data frame +operations. + +In the list below we describe which new operations can be found in the examples. +The queries are designed to be of increasing complexity, so it is recommended to +review them in order. For brevity, the following list does not include operations +found in previous examples. + +- [Convert CSV to Parquet](./tpch/convert_data_to_parquet.py) + - Read from a CSV files where the delimiter is something other than a comma + - Specify schema during CVS reading + - Write to a parquet file +- [Pricing Summary Report](./tpch/q01_pricing_summary_report.py) + - Aggregation computing the maximum value, average, sum, and number of entries + - Filter data by date and interval + - Sorting +- [Minimum Cost Supplier](./tpch/q02_minimum_cost_supplier.py) + - Window operation to find minimum + - Sorting in descending order +- [Shipping Priority](./tpch/q03_shipping_priority.py) +- [Order Priority Checking](./tpch/q04_order_priority_checking.py) + - Aggregating multiple times in one data frame +- [Local Supplier Volume](./tpch/q05_local_supplier_volume.py) +- [Forecasting Revenue Change](./tpch/q06_forecasting_revenue_change.py) + - Using collect and extracting values as a python object +- [Volume Shipping](./tpch/q07_volume_shipping.py) + - Finding multiple distinct and mutually exclusive values within one dataframe + - Using `case` and `when` statements +- [Market Share](./tpch/q08_market_share.py) + - The operations in this query are similar to those in the prior examples, but + it is a more complex example of using filters, joins, and aggregates + - Using left outer joins +- [Product Type Profit Measure](./tpch/q09_product_type_profit_measure.py) + - Extract year from a date +- [Returned Item Reporting](./tpch/q10_returned_item_reporting.py) +- [Important Stock Identification](./tpch/q11_important_stock_identification.py) +- [Shipping Modes and Order](./tpch/q12_ship_mode_order_priority.py) + - Finding non-null values using a boolean operation in a filter + - Case statement with default value +- [Customer Distribution](./tpch/q13_customer_distribution.py) +- [Promotion Effect](./tpch/q14_promotion_effect.py) +- [Top Supplier](./tpch/q15_top_supplier.py) +- [Parts/Supplier Relationship](./tpch/q16_part_supplier_relationship.py) + - Using anti joins + - Using regular expressions (regex) + - Creating arrays of literal values + - Determine if an element exists within an array +- [Small-Quantity-Order Revenue](./tpch/q17_small_quantity_order.py) +- [Large Volume Customer](./tpch/q18_large_volume_customer.py) +- [Discounted Revenue](./tpch/q19_discounted_revenue.py) + - Creating a user defined function (UDF) + - Convert pyarrow Array to python values + - Filtering based on a UDF +- [Potential Part Promotion](./tpch/q20_potential_part_promotion.py) + - Extracting part of a string using substr +- [Suppliers Who Kept Orders Waiting](./tpch/q21_suppliers_kept_orders_waiting.py) + - Using array aggregation + - Determining the size of array elements +- [Global Sales Opportunity](./tpch/q22_global_sales_opportunity.py) diff --git a/examples/tpch/.gitignore b/examples/tpch/.gitignore new file mode 100644 index 00000000..9e67bd47 --- /dev/null +++ b/examples/tpch/.gitignore @@ -0,0 +1,2 @@ +data + diff --git a/examples/tpch/README.md b/examples/tpch/README.md new file mode 100644 index 00000000..7c52c823 --- /dev/null +++ b/examples/tpch/README.md @@ -0,0 +1,57 @@ + + +# DataFusion Python Examples for TPC-H + +These examples reproduce the problems listed in the Transaction Process Council +TPC-H benchmark. The purpose of these examples is to demonstrate how to use +different aspects of Data Fusion and not necessarily geared towards creating the +most performant queries possible. Within each example is a description of the +problem. For users who are familiar with SQL style commands, you can compare the +approaches in these examples with those listed in the specification. + +- https://www.tpc.org/tpch/ + +The examples provided are based on version 2.18.0 of the TPC-H specification. + +## Data Setup + +To run these examples, you must first generate a dataset. The `dbgen` tool +provided by TPC can create datasets of arbitrary scale. For testing it is +typically sufficient to create a 1 gigabyte dataset. For convenience, this +repository has a script which uses docker to create this dataset. From the +`benchmarks/tpch` directory execute the following script. + +```bash +./tpch-gen.sh 1 +``` + +The examples provided use parquet files for the tables generated by `dbgen`. +A python script is provided to convert the text files from `dbgen` into parquet +files expected by the examples. From the `examples/tpch` directory you can +execute the following command to create the necessary parquet files. + +```bash +python convert_data_to_parquet.py +``` + +## Description of Examples + +For easier access, a description of the techniques demonstrated in each file +is in the README.md file in the `examples` directory. diff --git a/examples/tpch/convert_data_to_parquet.py b/examples/tpch/convert_data_to_parquet.py new file mode 100644 index 00000000..178b7fb3 --- /dev/null +++ b/examples/tpch/convert_data_to_parquet.py @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +This is a utility function that will consumer the data generated by dbgen from TPC-H and convert +it into a parquet file with the column names as expected by the TPC-H specification. It assumes +the data generated resides in a path ../../benchmarks/tpch/data relative to the current file, +as will be generated by the script provided in this repository. +""" + +import os +import pyarrow +import datafusion + +ctx = datafusion.SessionContext() + +all_schemas = {} + +all_schemas["customer"] = [ + ("C_CUSTKEY", pyarrow.int32()), + ("C_NAME", pyarrow.string()), + ("C_ADDRESS", pyarrow.string()), + ("C_NATIONKEY", pyarrow.int32()), + ("C_PHONE", pyarrow.string()), + ("C_ACCTBAL", pyarrow.float32()), + ("C_MKTSEGMENT", pyarrow.string()), + ("C_COMMENT", pyarrow.string()), +] + +all_schemas["lineitem"] = [ + ("L_ORDERKEY", pyarrow.int32()), + ("L_PARTKEY", pyarrow.int32()), + ("L_SUPPKEY", pyarrow.int32()), + ("L_LINENUMBER", pyarrow.int32()), + ("L_QUANTITY", pyarrow.float32()), + ("L_EXTENDEDPRICE", pyarrow.float32()), + ("L_DISCOUNT", pyarrow.float32()), + ("L_TAX", pyarrow.float32()), + ("L_RETURNFLAG", pyarrow.string()), + ("L_LINESTATUS", pyarrow.string()), + ("L_SHIPDATE", pyarrow.date32()), + ("L_COMMITDATE", pyarrow.date32()), + ("L_RECEIPTDATE", pyarrow.date32()), + ("L_SHIPINSTRUCT", pyarrow.string()), + ("L_SHIPMODE", pyarrow.string()), + ("L_COMMENT", pyarrow.string()), +] + +all_schemas["nation"] = [ + ("N_NATIONKEY", pyarrow.int32()), + ("N_NAME", pyarrow.string()), + ("N_REGIONKEY", pyarrow.int32()), + ("N_COMMENT", pyarrow.string()), +] + +all_schemas["orders"] = [ + ("O_ORDERKEY", pyarrow.int32()), + ("O_CUSTKEY", pyarrow.int32()), + ("O_ORDERSTATUS", pyarrow.string()), + ("O_TOTALPRICE", pyarrow.float32()), + ("O_ORDERDATE", pyarrow.date32()), + ("O_ORDERPRIORITY", pyarrow.string()), + ("O_CLERK", pyarrow.string()), + ("O_SHIPPRIORITY", pyarrow.int32()), + ("O_COMMENT", pyarrow.string()), +] + +all_schemas["part"] = [ + ("P_PARTKEY", pyarrow.int32()), + ("P_NAME", pyarrow.string()), + ("P_MFGR", pyarrow.string()), + ("P_BRAND", pyarrow.string()), + ("P_TYPE", pyarrow.string()), + ("P_SIZE", pyarrow.int32()), + ("P_CONTAINER", pyarrow.string()), + ("P_RETAILPRICE", pyarrow.float32()), + ("P_COMMENT", pyarrow.string()), +] + +all_schemas["partsupp"] = [ + ("PS_PARTKEY", pyarrow.int32()), + ("PS_SUPPKEY", pyarrow.int32()), + ("PS_AVAILQTY", pyarrow.int32()), + ("PS_SUPPLYCOST", pyarrow.float32()), + ("PS_COMMENT", pyarrow.string()), +] + +all_schemas["region"] = [ + ("r_REGIONKEY", pyarrow.int32()), + ("r_NAME", pyarrow.string()), + ("r_COMMENT", pyarrow.string()), +] + +all_schemas["supplier"] = [ + ("S_SUPPKEY", pyarrow.int32()), + ("S_NAME", pyarrow.string()), + ("S_ADDRESS", pyarrow.string()), + ("S_NATIONKEY", pyarrow.int32()), + ("S_PHONE", pyarrow.string()), + ("S_ACCTBAL", pyarrow.float32()), + ("S_COMMENT", pyarrow.string()), +] + +curr_dir = os.path.dirname(os.path.abspath(__file__)) +for filename, curr_schema in all_schemas.items(): + + # For convenience, go ahead and convert the schema column names to lowercase + curr_schema = [(s[0].lower(), s[1]) for s in curr_schema] + + # Pre-collect the output columns so we can ignore the null field we add + # in to handle the trailing | in the file + output_cols = [r[0] for r in curr_schema] + + # Trailing | requires extra field for in processing + curr_schema.append(("some_null", pyarrow.null())) + + schema = pyarrow.schema(curr_schema) + + source_file = os.path.abspath( + os.path.join(curr_dir, f"../../benchmarks/tpch/data/{filename}.csv") + ) + dest_file = os.path.abspath(os.path.join(curr_dir, f"./data/{filename}.parquet")) + + df = ctx.read_csv(source_file, schema=schema, has_header=False, delimiter="|") + + df = df.select_columns(*output_cols) + + df.write_parquet(dest_file, compression="snappy") diff --git a/examples/tpch/q01_pricing_summary_report.py b/examples/tpch/q01_pricing_summary_report.py new file mode 100644 index 00000000..1aafccab --- /dev/null +++ b/examples/tpch/q01_pricing_summary_report.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 1: + +The Pricing Summary Report Query provides a summary pricing report for all lineitems shipped as of +a given date. The date is within 60 - 120 days of the greatest ship date contained in the database. +The query lists totals for extended price, discounted extended price, discounted extended price +plus tax, average quantity, average extended price, and average discount. These aggregates are +grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS. +A count of the number of lineitems in each group is included. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +ctx = SessionContext() + +df = ctx.read_parquet("data/lineitem.parquet") + +# It may be that the date can be hard coded, based on examples shown. +# This approach will work with any date range in the provided data set. + +greatest_ship_date = df.aggregate( + [], [F.max(col("l_shipdate")).alias("shipdate")] +).collect()[0]["shipdate"][0] + +# From the given problem, this is how close to the last date in the database we +# want to report results for. It should be between 60-120 days before the end. +DAYS_BEFORE_FINAL = 68 + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL), type=pa.month_day_nano_interval()) + +print("Final date in database:", greatest_ship_date) + +# Filter data to the dates of interest +df = df.filter(col("l_shipdate") <= lit(greatest_ship_date) - lit(interval)) + +# Aggregate the results + +df = df.aggregate( + [col("l_returnflag"), col("l_linestatus")], + [ + F.sum(col("l_quantity")).alias("sum_qty"), + F.sum(col("l_extendedprice")).alias("sum_base_price"), + F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias( + "sum_disc_price" + ), + F.sum( + col("l_extendedprice") + * (lit(1.0) - col("l_discount")) + * (lit(1.0) + col("l_tax")) + ).alias("sum_charge"), + F.avg(col("l_quantity")).alias("avg_qty"), + F.avg(col("l_extendedprice")).alias("avg_price"), + F.avg(col("l_discount")).alias("avg_disc"), + F.count(col("l_returnflag")).alias( + "count_order" + ), # Counting any column should return same result + ], +) + +# Sort per the expected result + +df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort()) + +# Note: There appears to be a discrepancy between what is returned here and what is in the generated +# answers file for the case of return flag N and line status O, but I did not investigate further. + +df.show() diff --git a/examples/tpch/q02_minimum_cost_supplier.py b/examples/tpch/q02_minimum_cost_supplier.py new file mode 100644 index 00000000..262e2cf4 --- /dev/null +++ b/examples/tpch/q02_minimum_cost_supplier.py @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 2: + +The Minimum Cost Supplier Query finds, in a given region, for each part of a certain type and size, +the supplier who can supply it at minimum cost. If several suppliers in that region offer the +desired part type and size at the same (minimum) cost, the query lists the parts from suppliers with +the 100 highest account balances. For each supplier, the query lists the supplier's account balance, +name and nation; the part's number and manufacturer; the supplier's address, phone number and +comment information. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +import datafusion +from datafusion import SessionContext, col, lit, functions as F + +# This is the part we're looking for +SIZE_OF_INTEREST = 15 +TYPE_OF_INTEREST = "BRASS" +REGION_OF_INTEREST = "EUROPE" + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns( + "p_partkey", "p_mfgr", "p_type", "p_size" +) +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_acctbal", + "s_name", + "s_address", + "s_phone", + "s_comment", + "s_nationkey", + "s_suppkey", +) +df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns( + "ps_partkey", "ps_suppkey", "ps_supplycost" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_regionkey", "n_name" +) +df_region = ctx.read_parquet("data/region.parquet").select_columns( + "r_regionkey", "r_name" +) + +# Filter down parts. Part names contain the type of interest, so we can use strpos to find where +# in the p_type column the word is. `strpos` will return 0 if not found, otherwise the position +# in the string where it is located. + +df_part = df_part.filter( + F.strpos(col("p_type"), lit(TYPE_OF_INTEREST)) > lit(0) +).filter(col("p_size") == lit(SIZE_OF_INTEREST)) + +# Filter regions down to the one of interest + +df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST)) + +# Now that we have the region, find suppliers in that region. Suppliers are tied to their nation +# and nations are tied to the region. + +df_nation = df_nation.join(df_region, (["n_regionkey"], ["r_regionkey"]), how="inner") +df_supplier = df_supplier.join( + df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner" +) + +# Now that we know who the potential suppliers are for the part, we can limit out part +# supplies table down. We can further join down to the specific parts we've identified +# as matching the request + +df = df_partsupp.join(df_supplier, (["ps_suppkey"], ["s_suppkey"]), how="inner") + +# Locate the minimum cost across all suppliers. There are multiple ways you could do this, +# but one way is to create a window function across all suppliers, find the minimum, and +# create a column of that value. We can then filter down any rows for which the cost and +# minimum do not match. + +# The default window frame as of 5/6/2024 is from unbounded preceeding to the current row. +# We want to evaluate the entire data frame, so we specify this. +window_frame = datafusion.WindowFrame("rows", None, None) +df = df.with_column( + "min_cost", + F.window( + "min", + [col("ps_supplycost")], + partition_by=[col("ps_partkey")], + window_frame=window_frame, + ), +) + +df = df.filter(col("min_cost") == col("ps_supplycost")) + +df = df.join(df_part, (["ps_partkey"], ["p_partkey"]), how="inner") + +# From the problem statement, these are the values we wish to output + +df = df.select_columns( + "s_acctbal", + "s_name", + "n_name", + "p_partkey", + "p_mfgr", + "s_address", + "s_phone", + "s_comment", +) + +# Sort and display 100 entries +df = df.sort( + col("s_acctbal").sort(ascending=False), + col("n_name").sort(), + col("s_name").sort(), + col("p_partkey").sort(), +) + +df = df.limit(100) + +# Show results + +df.show() diff --git a/examples/tpch/q03_shipping_priority.py b/examples/tpch/q03_shipping_priority.py new file mode 100644 index 00000000..78993e9e --- /dev/null +++ b/examples/tpch/q03_shipping_priority.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 3: + +The Shipping Priority Query retrieves the shipping priority and potential revenue, defined as the +sum of l_extendedprice * (1-l_discount), of the orders having the largest revenue among those that +had not been shipped as of a given date. Orders are listed in decreasing order of revenue. If more +than 10 unshipped orders exist, only the 10 orders with the largest revenue are listed. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, col, lit, functions as F + +SEGMENT_OF_INTEREST = "BUILDING" +DATE_OF_INTEREST = "1995-03-15" + +# Load the dataframes we need + +ctx = SessionContext() + +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_mktsegment", "c_custkey" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderdate", "o_shippriority", "o_custkey", "o_orderkey" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_orderkey", "l_extendedprice", "l_discount", "l_shipdate" +) + +# Limit dataframes to the rows of interest + +df_customer = df_customer.filter(col("c_mktsegment") == lit(SEGMENT_OF_INTEREST)) +df_orders = df_orders.filter(col("o_orderdate") < lit(DATE_OF_INTEREST)) +df_lineitem = df_lineitem.filter(col("l_shipdate") > lit(DATE_OF_INTEREST)) + +# Join all 3 dataframes + +df = df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="inner").join( + df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner" +) + +# Compute the revenue + +df = df.aggregate( + [col("l_orderkey")], + [ + F.first_value(col("o_orderdate")).alias("o_orderdate"), + F.first_value(col("o_shippriority")).alias("o_shippriority"), + F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias("revenue"), + ], +) + +# Sort by priority + +df = df.sort(col("revenue").sort(ascending=False), col("o_orderdate").sort()) + +# Only return 100 results + +df = df.limit(100) + +# Change the order that the columns are reported in just to match the spec + +df = df.select_columns("l_orderkey", "revenue", "o_orderdate", "o_shippriority") + +# Show result + +df.show() diff --git a/examples/tpch/q04_order_priority_checking.py b/examples/tpch/q04_order_priority_checking.py new file mode 100644 index 00000000..b691d5b1 --- /dev/null +++ b/examples/tpch/q04_order_priority_checking.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 4: + +The Order Priority Checking Query counts the number of orders ordered in a given quarter of a given +year in which at least one lineitem was received by the customer later than its committed date. The +query lists the count of such orders for each order priority sorted in ascending priority order. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +# Ideally we could put 3 months into the interval. See note below. +INTERVAL_DAYS = 92 +DATE_OF_INTEREST = "1993-07-01" + +# Load the dataframes we need + +ctx = SessionContext() + +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderdate", "o_orderpriority", "o_orderkey" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_orderkey", "l_commitdate", "l_receiptdate" +) + +# Create a date object from the string +date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date() + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval()) + +# Limit results to cases where commitment date before receipt date +# Aggregate the results so we only get one row to join with the order table. +# Alterately, and likely more idomatic is instead of `.aggregate` you could +# do `.select_columns("l_orderkey").distinct()`. The goal here is to show +# mulitple examples of how to use Data Fusion. +df_lineitem = df_lineitem.filter(col("l_commitdate") < col("l_receiptdate")).aggregate( + [col("l_orderkey")], [] +) + +# Limit orders to date range of interest +df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter( + col("o_orderdate") < lit(date) + lit(interval) +) + +# Perform the join to find only orders for which there are lineitems outside of expected range +df = df_orders.join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner") + +# Based on priority, find the number of entries +df = df.aggregate( + [col("o_orderpriority")], [F.count(col("o_orderpriority")).alias("order_count")] +) + +# Sort the results +df = df.sort(col("o_orderpriority").sort()) + +df.show() diff --git a/examples/tpch/q05_local_supplier_volume.py b/examples/tpch/q05_local_supplier_volume.py new file mode 100644 index 00000000..7cb6e632 --- /dev/null +++ b/examples/tpch/q05_local_supplier_volume.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 5: + +The Local Supplier Volume Query lists for each nation in a region the revenue volume that resulted +from lineitem transactions in which the customer ordering parts and the supplier filling them were +both within that nation. The query is run in order to determine whether to institute local +distribution centers in a given region. The query considers only parts ordered in a given year. The +query displays the nations and revenue volume in descending order by revenue. Revenue volume for all +qualifying lineitems in a particular nation is defined as sum(l_extendedprice * (1 - l_discount)). + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + + +DATE_OF_INTEREST = "1994-01-01" +INTERVAL_DAYS = 365 +REGION_OF_INTEREST = "ASIA" + +date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date() + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval()) + +# Load the dataframes we need + +ctx = SessionContext() + +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_custkey", "c_nationkey" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_custkey", "o_orderkey", "o_orderdate" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_orderkey", "l_suppkey", "l_extendedprice", "l_discount" +) +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_nationkey" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_regionkey", "n_name" +) +df_region = ctx.read_parquet("data/region.parquet").select_columns( + "r_regionkey", "r_name" +) + +# Restrict dataframes to cases of interest +df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter( + col("o_orderdate") < lit(date) + lit(interval) +) + +df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST)) + +# Join all the dataframes + +df = ( + df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="inner") + .join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner") + .join( + df_supplier, + (["l_suppkey", "c_nationkey"], ["s_suppkey", "s_nationkey"]), + how="inner", + ) + .join(df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner") + .join(df_region, (["n_regionkey"], ["r_regionkey"]), how="inner") +) + +# Compute the final result + +df = df.aggregate( + [col("n_name")], + [F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias("revenue")], +) + +# Sort in descending order + +df = df.sort(col("revenue").sort(ascending=False)) + +df.show() diff --git a/examples/tpch/q06_forecasting_revenue_change.py b/examples/tpch/q06_forecasting_revenue_change.py new file mode 100644 index 00000000..5fbb9177 --- /dev/null +++ b/examples/tpch/q06_forecasting_revenue_change.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 6: + +The Forecasting Revenue Change Query considers all the lineitems shipped in a given year with +discounts between DISCOUNT-0.01 and DISCOUNT+0.01. The query lists the amount by which the total +revenue would have increased if these discounts had been eliminated for lineitems with l_quantity +less than quantity. Note that the potential revenue increase is equal to the sum of +[l_extendedprice * l_discount] for all lineitems with discounts and quantities in the qualifying +range. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +# Variables from the example query + +DATE_OF_INTEREST = "1994-01-01" +DISCOUT = 0.06 +DELTA = 0.01 +QUANTITY = 24 + +INTERVAL_DAYS = 365 + +date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date() + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval()) + +# Load the dataframes we need + +ctx = SessionContext() + +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_shipdate", "l_quantity", "l_extendedprice", "l_discount" +) + +# Filter down to lineitems of interest + +df = ( + df_lineitem.filter(col("l_shipdate") >= lit(date)) + .filter(col("l_shipdate") < lit(date) + lit(interval)) + .filter(col("l_discount") >= lit(DISCOUT) - lit(DELTA)) + .filter(col("l_discount") <= lit(DISCOUT) + lit(DELTA)) + .filter(col("l_quantity") < lit(QUANTITY)) +) + +# Add up all the "lost" revenue + +df = df.aggregate( + [], [F.sum(col("l_extendedprice") * col("l_discount")).alias("revenue")] +) + +# Show the single result. We could do a `show()` but since we want to demonstrate features of how +# to use Data Fusion, instead collect the result as a python object and print it out. + +# collect() should give a list of record batches. This is a small query, so we should get a +# single batch back, hence the index [0]. Within each record batch we only care about the +# single column result `revenue`. Since we have only one row returned because we aggregated +# over the entire dataframe, we can index it at 0. Then convert the DoubleScalar into a +# simple python object. + +revenue = df.collect()[0]["revenue"][0].as_py() + +# Note: the output value from this query may be dependant on the size of the database generated +print(f"Potential lost revenue: {revenue:.2f}") diff --git a/examples/tpch/q07_volume_shipping.py b/examples/tpch/q07_volume_shipping.py new file mode 100644 index 00000000..3c87f937 --- /dev/null +++ b/examples/tpch/q07_volume_shipping.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 7: + +The Volume Shipping Query finds, for two given nations, the gross discounted revenues derived from +lineitems in which parts were shipped from a supplier in either nation to a customer in the other +nation during 1995 and 1996. The query lists the supplier nation, the customer nation, the year, +and the revenue from shipments that took place in that year. The query orders the answer by +Supplier nation, Customer nation, and year (all ascending). + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +# Variables of interest to query over + +nation_1 = lit("FRANCE") +nation_2 = lit("GERMANY") + +START_DATE = "1995-01-01" +END_DATE = "1996-12-31" + +start_date = lit(datetime.strptime(START_DATE, "%Y-%m-%d").date()) +end_date = lit(datetime.strptime(END_DATE, "%Y-%m-%d").date()) + + +# Load the dataframes we need + +ctx = SessionContext() + +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_nationkey" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_shipdate", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_custkey" +) +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_custkey", "c_nationkey" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name" +) + + +# Filter to time of interest +df_lineitem = df_lineitem.filter(col("l_shipdate") >= start_date).filter( + col("l_shipdate") <= end_date +) + + +# A simpler way to do the following operation is to use a filter, but we also want to demonstrate +# how to use case statements. Here we are assigning `n_name` to be itself when it is either of +# the two nations of interest. Since there is no `otherwise()` statement, any values that do +# not match these will result in a null value and then get filtered out. +# +# To do the same using a simle filter would be: +# df_nation = df_nation.filter((F.col("n_name") == nation_1) | (F.col("n_name") == nation_2)) +df_nation = df_nation.with_column( + "n_name", + F.case(col("n_name")) + .when(nation_1, col("n_name")) + .when(nation_2, col("n_name")) + .end(), +).filter(~col("n_name").is_null()) + + +# Limit suppliers to either nation +df_supplier = df_supplier.join( + df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner" +).select(col("s_suppkey"), col("n_name").alias("supp_nation")) + +# Limit customers to either nation +df_customer = df_customer.join( + df_nation, (["c_nationkey"], ["n_nationkey"]), how="inner" +).select(col("c_custkey"), col("n_name").alias("cust_nation")) + +# Join up all the data frames from line items, and make sure the supplier and customer are in +# different nations. +df = ( + df_lineitem.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner") + .join(df_customer, (["o_custkey"], ["c_custkey"]), how="inner") + .join(df_supplier, (["l_suppkey"], ["s_suppkey"]), how="inner") + .filter(col("cust_nation") != col("supp_nation")) +) + +# Extract out two values for every line item +df = df.with_column( + "l_year", F.datepart(lit("year"), col("l_shipdate")).cast(pa.int32()) +).with_column("volume", col("l_extendedprice") * (lit(1.0) - col("l_discount"))) + +# Aggregate the results +df = df.aggregate( + [col("supp_nation"), col("cust_nation"), col("l_year")], + [F.sum(col("volume")).alias("revenue")], +) + +# Sort based on problem statement requirements +df = df.sort(col("supp_nation").sort(), col("cust_nation").sort(), col("l_year").sort()) + +df.show() diff --git a/examples/tpch/q08_market_share.py b/examples/tpch/q08_market_share.py new file mode 100644 index 00000000..a415156e --- /dev/null +++ b/examples/tpch/q08_market_share.py @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 8: + +The market share for a given nation within a given region is defined as the fraction of the +revenue, the sum of [l_extendedprice * (1-l_discount)], from the products of a specified type in +that region that was supplied by suppliers from the given nation. The query determines this for the +years 1995 and 1996 presented in this order. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +supplier_nation = lit("BRAZIL") +customer_region = lit("AMERICA") +part_of_interest = lit("ECONOMY ANODIZED STEEL") + +START_DATE = "1995-01-01" +END_DATE = "1996-12-31" + +start_date = lit(datetime.strptime(START_DATE, "%Y-%m-%d").date()) +end_date = lit(datetime.strptime(END_DATE, "%Y-%m-%d").date()) + + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", "p_type") +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_nationkey" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_partkey", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_custkey", "o_orderdate" +) +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_custkey", "c_nationkey" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name", "n_regionkey" +) +df_region = ctx.read_parquet("data/region.parquet").select_columns( + "r_regionkey", "r_name" +) + +# Limit possible parts to the one specified +df_part = df_part.filter(col("p_type") == part_of_interest) + +# Limit orders to those in the specified range + +df_orders = df_orders.filter(col("o_orderdate") >= start_date).filter( + col("o_orderdate") <= end_date +) + +# Part 1: Find customers in the region + +# We want customers in region specified by region_of_interest. This will be used to compute +# the total sales of the part of interest. We want to know of those sales what fraction +# was supplied by the nation of interest. There is no guarantee that the nation of +# interest is within the region of interest. + +# First we find all the sales that make up the basis. + +df_regional_customers = df_region.filter(col("r_name") == customer_region) + +# After this join we have all of the possible sales nations +df_regional_customers = df_regional_customers.join( + df_nation, (["r_regionkey"], ["n_regionkey"]), how="inner" +) + +# Now find the possible customers +df_regional_customers = df_regional_customers.join( + df_customer, (["n_nationkey"], ["c_nationkey"]), how="inner" +) + +# Next find orders for these customers +df_regional_customers = df_regional_customers.join( + df_orders, (["c_custkey"], ["o_custkey"]), how="inner" +) + +# Find all line items from these orders +df_regional_customers = df_regional_customers.join( + df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner" +) + +# Limit to the part of interest +df_regional_customers = df_regional_customers.join( + df_part, (["l_partkey"], ["p_partkey"]), how="inner" +) + +# Compute the volume for each line item +df_regional_customers = df_regional_customers.with_column( + "volume", col("l_extendedprice") * (lit(1.0) - col("l_discount")) +) + +# Part 2: Find suppliers from the nation + +# Now that we have all of the sales of that part in the specified region, we need +# to determine which of those came from suppliers in the nation we are interested in. + +df_national_suppliers = df_nation.filter(col("n_name") == supplier_nation) + +# Determine the suppliers by the limited nation key we have in our single row df above +df_national_suppliers = df_national_suppliers.join( + df_supplier, (["n_nationkey"], ["s_nationkey"]), how="inner" +) + +# When we join to the customer dataframe, we don't want to confuse other columns, so only +# select the supplier key that we need +df_national_suppliers = df_national_suppliers.select_columns("s_suppkey") + + +# Part 3: Combine suppliers and customers and compute the market share + +# Now we can do a left outer join on the suppkey. Those line items from other suppliers +# will get a null value. We can check for the existence of this null to compute a volume +# column only from suppliers in the nation we are evaluating. + +df = df_regional_customers.join( + df_national_suppliers, (["l_suppkey"], ["s_suppkey"]), how="left" +) + +# Use a case statement to compute the volume sold by suppliers in the nation of interest +df = df.with_column( + "national_volume", + F.case(col("s_suppkey").is_null()) + .when(lit(False), col("volume")) + .otherwise(lit(0.0)), +) + +df = df.with_column( + "o_year", F.datepart(lit("year"), col("o_orderdate")).cast(pa.int32()) +) + + +# Lastly, sum up the results + +df = df.aggregate( + [col("o_year")], + [ + F.sum(col("volume")).alias("volume"), + F.sum(col("national_volume")).alias("national_volume"), + ], +) + +df = df.select( + col("o_year"), (F.col("national_volume") / F.col("volume")).alias("mkt_share") +) + +df = df.sort(col("o_year").sort()) + +df.show() diff --git a/examples/tpch/q09_product_type_profit_measure.py b/examples/tpch/q09_product_type_profit_measure.py new file mode 100644 index 00000000..4fdfc1cb --- /dev/null +++ b/examples/tpch/q09_product_type_profit_measure.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 9: + +The Product Type Profit Measure Query finds, for each nation and each year, the profit for all parts +ordered in that year that contain a specified substring in their names and that were filled by a +supplier in that nation. The profit is defined as the sum of +[(l_extendedprice*(1-l_discount)) - (ps_supplycost * l_quantity)] for all lineitems describing +parts in the specified line. The query lists the nations in ascending alphabetical order and, for +each nation, the year and profit in descending order by year (most recent first). + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +part_color = lit("green") + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", "p_name") +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_nationkey" +) +df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns( + "ps_suppkey", "ps_partkey", "ps_supplycost" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_partkey", + "l_extendedprice", + "l_discount", + "l_suppkey", + "l_orderkey", + "l_quantity", +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_custkey", "o_orderdate" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name", "n_regionkey" +) + +# Limit possible parts to the color specified +df = df_part.filter(F.strpos(col("p_name"), part_color) > lit(0)) + +# We have a series of joins that get us to limit down to the line items we need +df = df.join(df_lineitem, (["p_partkey"], ["l_partkey"]), how="inner") +df = df.join(df_supplier, (["l_suppkey"], ["s_suppkey"]), how="inner") +df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner") +df = df.join( + df_partsupp, (["l_suppkey", "l_partkey"], ["ps_suppkey", "ps_partkey"]), how="inner" +) +df = df.join(df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner") + +# Compute the intermediate values and limit down to the expressions we need +df = df.select( + col("n_name").alias("nation"), + F.datepart(lit("year"), col("o_orderdate")).cast(pa.int32()).alias("o_year"), + ( + col("l_extendedprice") * (lit(1.0) - col("l_discount")) + - (col("ps_supplycost") * col("l_quantity")) + ).alias("amount"), +) + +# Sum up the values by nation and year +df = df.aggregate( + [col("nation"), col("o_year")], [F.sum(col("amount")).alias("profit")] +) + +# Sort according to the problem specification +df = df.sort(col("nation").sort(), col("o_year").sort(ascending=False)) + +df.show() diff --git a/examples/tpch/q10_returned_item_reporting.py b/examples/tpch/q10_returned_item_reporting.py new file mode 100644 index 00000000..1879027c --- /dev/null +++ b/examples/tpch/q10_returned_item_reporting.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 10: + +The Returned Item Reporting Query finds the top 20 customers, in terms of their effect on lost +revenue for a given quarter, who have returned parts. The query considers only parts that were +ordered in the specified quarter. The query lists the customer's name, address, nation, phone +number, account balance, comment information and revenue lost. The customers are listed in +descending order of lost revenue. Revenue lost is defined as +sum(l_extendedprice*(1-l_discount)) for all qualifying lineitems. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +DATE_START_OF_QUARTER = "1993-10-01" + +date_start_of_quarter = lit(datetime.strptime(DATE_START_OF_QUARTER, "%Y-%m-%d").date()) + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval_one_quarter = lit(pa.scalar((0, 0, 120), type=pa.month_day_nano_interval())) + +# Load the dataframes we need + +ctx = SessionContext() + +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_custkey", + "c_nationkey", + "c_name", + "c_acctbal", + "c_address", + "c_phone", + "c_comment", +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_extendedprice", "l_discount", "l_orderkey", "l_returnflag" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_custkey", "o_orderdate" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name", "n_regionkey" +) + +# limit to returns +df_lineitem = df_lineitem.filter(col("l_returnflag") == lit("R")) + + +# Rather than aggregate by all of the customer fields as you might do looking at the specification, +# we can aggregate by o_custkey and then join in the customer data at the end. + +df = df_orders.filter(col("o_orderdate") >= date_start_of_quarter).filter( + col("o_orderdate") < date_start_of_quarter + interval_one_quarter +) + +df = df.join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner") + +# Compute the revenue +df = df.aggregate( + [col("o_custkey")], + [F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias("revenue")], +) + +# Now join in the customer data +df = df.join(df_customer, (["o_custkey"], ["c_custkey"]), how="inner") +df = df.join(df_nation, (["c_nationkey"], ["n_nationkey"]), how="inner") + +# These are the columns the problem statement requires +df = df.select_columns( + "c_custkey", + "c_name", + "revenue", + "c_acctbal", + "n_name", + "c_address", + "c_phone", + "c_comment", +) + +# Sort the results in descending order +df = df.sort(col("revenue").sort(ascending=False)) + +# Only return the top 20 results +df = df.limit(20) + +df.show() diff --git a/examples/tpch/q11_important_stock_identification.py b/examples/tpch/q11_important_stock_identification.py new file mode 100644 index 00000000..78fe26db --- /dev/null +++ b/examples/tpch/q11_important_stock_identification.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 11: + +The Important Stock Identification Query finds, from scanning the available stock of suppliers +in a given nation, all the parts that represent a significant percentage of the total value of +all available parts. The query displays the part number and the value of those parts in +descending order of value. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, WindowFrame, col, lit, functions as F + +NATION = "GERMANY" +FRACTION = 0.0001 + +# Load the dataframes we need + +ctx = SessionContext() + +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_nationkey" +) +df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns( + "ps_supplycost", "ps_availqty", "ps_suppkey", "ps_partkey" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name" +) + +# limit to returns +df_nation = df_nation.filter(col("n_name") == lit(NATION)) + +# Find part supplies of within this target nation + +df = df_nation.join(df_supplier, (["n_nationkey"], ["s_nationkey"]), how="inner") + +df = df.join(df_partsupp, (["s_suppkey"], ["ps_suppkey"]), how="inner") + + +# Compute the value of individual parts +df = df.with_column("value", col("ps_supplycost") * col("ps_availqty")) + +# Compute total value of specific parts +df = df.aggregate([col("ps_partkey")], [F.sum(col("value")).alias("value")]) + +# By default window functions go from unbounded preceeding to current row, but we want +# to compute this sum across all rows +window_frame = WindowFrame("rows", None, None) + +df = df.with_column( + "total_value", F.window("sum", [col("value")], window_frame=window_frame) +) + +# Limit to the parts for which there is a significant value based on the fraction of the total +df = df.filter(col("value") / col("total_value") > lit(FRACTION)) + +# We only need to report on these two columns +df = df.select_columns("ps_partkey", "value") + +# Sort in descending order of value +df = df.sort(col("value").sort(ascending=False)) + +df.show() diff --git a/examples/tpch/q12_ship_mode_order_priority.py b/examples/tpch/q12_ship_mode_order_priority.py new file mode 100644 index 00000000..e76efa54 --- /dev/null +++ b/examples/tpch/q12_ship_mode_order_priority.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 12: + +The Shipping Modes and Order Priority Query counts, by ship mode, for lineitems actually received +by customers in a given year, the number of lineitems belonging to orders for which the +l_receiptdate exceeds the l_commitdate for two different specified ship modes. Only lineitems that +were actually shipped before the l_commitdate are considered. The late lineitems are partitioned +into two groups, those with priority URGENT or HIGH, and those with a priority other than URGENT or +HIGH. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +SHIP_MODE_1 = "MAIL" +SHIP_MODE_2 = "SHIP" +DATE_OF_INTEREST = "1994-01-01" + +# Load the dataframes we need + +ctx = SessionContext() + +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_orderpriority" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_orderkey", "l_shipmode", "l_commitdate", "l_shipdate", "l_receiptdate" +) + +date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date() + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval()) + + +df = df_lineitem.filter(col("l_receiptdate") >= lit(date)).filter( + col("l_receiptdate") < lit(date) + lit(interval) +) + +# Note: It is not recommended to use array_has because it treats the second argument as an argument +# so if you pass it col("l_shipmode") it will pass the entire array to process which is very slow. +# Instead check the position of the entry is not null. +df = df.filter( + ~F.array_position( + F.make_array(lit(SHIP_MODE_1), lit(SHIP_MODE_2)), col("l_shipmode") + ).is_null() +) + +# Since we have only two values, it's much easier to do this as a filter where the l_shipmode +# matches either of the two values, but we want to show doing some array operations in this +# example. If you want to see this done with filters, comment out the above line and uncomment +# this one. +# df = df.filter((col("l_shipmode") == lit(SHIP_MODE_1)) | (col("l_shipmode") == lit(SHIP_MODE_2))) + + +# We need order priority, so join order df to line item +df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner") + +# Restrict to line items we care about based on the problem statement. +df = df.filter(col("l_commitdate") < col("l_receiptdate")) + +df = df.filter(col("l_shipdate") < col("l_commitdate")) + +df = df.with_column( + "high_line_value", + F.case(col("o_orderpriority")) + .when(lit("1-URGENT"), lit(1)) + .when(lit("2-HIGH"), lit(1)) + .otherwise(lit(0)), +) + +# Aggregate the results +df = df.aggregate( + [col("l_shipmode")], + [ + F.sum(col("high_line_value")).alias("high_line_count"), + F.count(col("high_line_value")).alias("all_lines_count"), + ], +) + +# Compute the final output +df = df.select( + col("l_shipmode"), + col("high_line_count"), + (col("all_lines_count") - col("high_line_count")).alias("low_line_count"), +) + +df = df.sort(col("l_shipmode").sort()) + +df.show() diff --git a/examples/tpch/q13_customer_distribution.py b/examples/tpch/q13_customer_distribution.py new file mode 100644 index 00000000..1eb9ca30 --- /dev/null +++ b/examples/tpch/q13_customer_distribution.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 13: + +This query determines the distribution of customers by the number of orders they have made, +including customers who have no record of orders, past or present. It counts and reports how many +customers have no orders, how many have 1, 2, 3, etc. A check is made to ensure that the orders +counted do not fall into one of several special categories of orders. Special categories are +identified in the order comment column by looking for a particular pattern. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, col, lit, functions as F + +WORD_1 = "special" +WORD_2 = "requests" + +# Load the dataframes we need + +ctx = SessionContext() + +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_custkey", "o_comment" +) +df_customer = ctx.read_parquet("data/customer.parquet").select_columns("c_custkey") + +# Use a regex to remove special cases +df_orders = df_orders.filter( + F.regexp_match(col("o_comment"), lit(f"{WORD_1}.?*{WORD_2}")).is_null() +) + +# Since we may have customers with no orders we must do a left join +df = df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="left") + +# Find the number of orders for each customer +df = df.aggregate([col("c_custkey")], [F.count(col("c_custkey")).alias("c_count")]) + +# Ultimately we want to know the number of customers that have that customer count +df = df.aggregate([col("c_count")], [F.count(col("c_count")).alias("custdist")]) + +# We want to order the results by the highest number of customers per count +df = df.sort( + col("custdist").sort(ascending=False), col("c_count").sort(ascending=False) +) + +df.show() diff --git a/examples/tpch/q14_promotion_effect.py b/examples/tpch/q14_promotion_effect.py new file mode 100644 index 00000000..9ec38366 --- /dev/null +++ b/examples/tpch/q14_promotion_effect.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 14: + +The Promotion Effect Query determines what percentage of the revenue in a given year and month was +derived from promotional parts. The query considers only parts actually shipped in that month and +gives the percentage. Revenue is defined as (l_extendedprice * (1-l_discount)). + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +DATE = "1995-09-01" + +date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date()) +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval_one_month = lit(pa.scalar((0, 0, 30), type=pa.month_day_nano_interval())) + +# Load the dataframes we need + +ctx = SessionContext() + +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_partkey", "l_shipdate", "l_extendedprice", "l_discount" +) +df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", "p_type") + + +# Check part type begins with PROMO +df_part = df_part.filter( + F.substr(col("p_type"), lit(0), lit(6)) == lit("PROMO") +).with_column("promo_factor", lit(1.0)) + +df_lineitem = df_lineitem.filter(col("l_shipdate") >= date_of_interest).filter( + col("l_shipdate") < date_of_interest + interval_one_month +) + +# Left join so we can sum up the promo parts different from other parts +df = df_lineitem.join(df_part, (["l_partkey"], ["p_partkey"]), "left") + +# Make a factor of 1.0 if it is a promotion, 0.0 otherwise +df = df.with_column("promo_factor", F.coalesce(col("promo_factor"), lit(0.0))) +df = df.with_column("revenue", col("l_extendedprice") * (lit(1.0) - col("l_discount"))) + + +# Sum up the promo and total revenue +df = df.aggregate( + [], + [ + F.sum(col("promo_factor") * col("revenue")).alias("promo_revenue"), + F.sum(col("revenue")).alias("total_revenue"), + ], +) + +# Return the percentage of revenue from promotions +df = df.select( + (lit(100.0) * col("promo_revenue") / col("total_revenue")).alias("promo_revenue") +) + +df.show() diff --git a/examples/tpch/q15_top_supplier.py b/examples/tpch/q15_top_supplier.py new file mode 100644 index 00000000..7113e04f --- /dev/null +++ b/examples/tpch/q15_top_supplier.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 15: + +The Top Supplier Query finds the supplier who contributed the most to the overall revenue for parts +shipped during a given quarter of a given year. In case of a tie, the query lists all suppliers +whose contribution was equal to the maximum, presented in supplier number order. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, WindowFrame, col, lit, functions as F + +DATE = "1996-01-01" + +date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date()) +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval_3_months = lit(pa.scalar((0, 0, 90), type=pa.month_day_nano_interval())) + +# Load the dataframes we need + +ctx = SessionContext() + +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_suppkey", "l_shipdate", "l_extendedprice", "l_discount" +) +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", + "s_name", + "s_address", + "s_phone", +) + +# Limit line items to the quarter of interest +df_lineitem = df_lineitem.filter(col("l_shipdate") >= date_of_interest).filter( + col("l_shipdate") < date_of_interest + interval_3_months +) + +df = df_lineitem.aggregate( + [col("l_suppkey")], + [ + F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias( + "total_revenue" + ) + ], +) + +# Use a window function to find the maximum revenue across the entire dataframe +window_frame = WindowFrame("rows", None, None) +df = df.with_column( + "max_revenue", F.window("max", [col("total_revenue")], window_frame=window_frame) +) + +# Find all suppliers whose total revenue is the same as the maximum +df = df.filter(col("total_revenue") == col("max_revenue")) + +# Now that we know the supplier(s) with maximum revenue, get the rest of their information +# from the supplier table +df = df.join(df_supplier, (["l_suppkey"], ["s_suppkey"]), "inner") + +# Return only the colums requested +df = df.select_columns("s_suppkey", "s_name", "s_address", "s_phone", "total_revenue") + +# If we have more than one, sort by supplier number (suppkey) +df = df.sort(col("s_suppkey").sort()) + +df.show() diff --git a/examples/tpch/q16_part_supplier_relationship.py b/examples/tpch/q16_part_supplier_relationship.py new file mode 100644 index 00000000..5f941d5a --- /dev/null +++ b/examples/tpch/q16_part_supplier_relationship.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 16: + +The Parts/Supplier Relationship Query counts the number of suppliers who can supply parts that +satisfy a particular customer's requirements. The customer is interested in parts of eight +different sizes as long as they are not of a given type, not of a given brand, and not from a +supplier who has had complaints registered at the Better Business Bureau. Results must be presented +in descending count and ascending brand, type, and size. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +BRAND = "Brand#45" +TYPE_TO_IGNORE = "MEDIUM POLISHED" +SIZES_OF_INTEREST = [49, 14, 23, 45, 19, 3, 36, 9] + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns( + "p_partkey", "p_brand", "p_type", "p_size" +) +df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns( + "ps_suppkey", "ps_partkey" +) +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_comment" +) + +df_unwanted_suppliers = df_supplier.filter( + ~F.regexp_match(col("s_comment"), lit("Customer.?*Complaints")).is_null() +) + +# Remove unwanted suppliers +df_partsupp = df_partsupp.join( + df_unwanted_suppliers, (["ps_suppkey"], ["s_suppkey"]), "anti" +) + +# Select the parts we are interested in +df_part = df_part.filter(col("p_brand") == lit(BRAND)) +df_part = df_part.filter( + F.substr(col("p_type"), lit(0), lit(len(TYPE_TO_IGNORE) + 1)) != lit(TYPE_TO_IGNORE) +) + +# Python conversion of integer to literal casts it to int64 but the data for +# part size is stored as an int32, so perform a cast. Then check to find if the part +# size is within the array of possible sizes by checking the position of it is not +# null. +p_sizes = F.make_array(*[lit(s).cast(pa.int32()) for s in SIZES_OF_INTEREST]) +df_part = df_part.filter(~F.array_position(p_sizes, col("p_size")).is_null()) + +df = df_part.join(df_partsupp, (["p_partkey"], ["ps_partkey"]), "inner") + +df = df.select_columns("p_brand", "p_type", "p_size", "ps_suppkey").distinct() + +df = df.aggregate( + [col("p_brand"), col("p_type"), col("p_size")], + [F.count(col("ps_suppkey")).alias("supplier_cnt")], +) + +df = df.sort(col("supplier_cnt").sort(ascending=False)) + +df.show() diff --git a/examples/tpch/q17_small_quantity_order.py b/examples/tpch/q17_small_quantity_order.py new file mode 100644 index 00000000..aae238b2 --- /dev/null +++ b/examples/tpch/q17_small_quantity_order.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 17: + +The Small-Quantity-Order Revenue Query considers parts of a given brand and with a given container +type and determines the average lineitem quantity of such parts ordered for all orders (past and +pending) in the 7-year database. What would be the average yearly gross (undiscounted) loss in +revenue if orders for these parts with a quantity of less than 20% of this average were no longer +taken? + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, WindowFrame, col, lit, functions as F + +BRAND = "Brand#23" +CONTAINER = "MED BOX" + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns( + "p_partkey", "p_brand", "p_container" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_partkey", "l_quantity", "l_extendedprice" +) + +# Limit to the problem statement's brand and container types +df = df_part.filter(col("p_brand") == lit(BRAND)).filter( + col("p_container") == lit(CONTAINER) +) + +# Combine data +df = df.join(df_lineitem, (["p_partkey"], ["l_partkey"]), "inner") + +# Find the average quantity +window_frame = WindowFrame("rows", None, None) +df = df.with_column( + "avg_quantity", F.window("avg", [col("l_quantity")], window_frame=window_frame) +) + +df = df.filter(col("l_quantity") < lit(0.2) * col("avg_quantity")) + +# Compute the total +df = df.aggregate([], [F.sum(col("l_extendedprice")).alias("total")]) + +# Divide by number of years in the problem statement to get average +df = df.select((col("total") / lit(7.0)).alias("avg_yearly")) + +df.show() diff --git a/examples/tpch/q18_large_volume_customer.py b/examples/tpch/q18_large_volume_customer.py new file mode 100644 index 00000000..96ca08ff --- /dev/null +++ b/examples/tpch/q18_large_volume_customer.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 18: + +The Large Volume Customer Query finds a list of the top 100 customers who have ever placed large +quantity orders. The query lists the customer name, customer key, the order key, date and total +price and the quantity for the order. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, col, lit, functions as F + +QUANTITY = 300 + +# Load the dataframes we need + +ctx = SessionContext() + +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_custkey", "c_name" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_custkey", "o_orderdate", "o_totalprice" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_orderkey", "l_quantity", "l_extendedprice" +) + +df = df_lineitem.aggregate( + [col("l_orderkey")], [F.sum(col("l_quantity")).alias("total_quantity")] +) + +# Limit to orders in which the total quantity is above a threshold +df = df.filter(col("total_quantity") > lit(QUANTITY)) + +# We've identified the orders of interest, now join the additional data +# we are required to report on +df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), "inner") +df = df.join(df_customer, (["o_custkey"], ["c_custkey"]), "inner") + +df = df.select_columns( + "c_name", "c_custkey", "o_orderkey", "o_orderdate", "o_totalprice", "total_quantity" +) + +df = df.sort(col("o_totalprice").sort(ascending=False), col("o_orderdate").sort()) + +df.show() diff --git a/examples/tpch/q19_discounted_revenue.py b/examples/tpch/q19_discounted_revenue.py new file mode 100644 index 00000000..20ad48a7 --- /dev/null +++ b/examples/tpch/q19_discounted_revenue.py @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 19: + +The Discounted Revenue query finds the gross discounted revenue for all orders for three different +types of parts that were shipped by air and delivered in person. Parts are selected based on the +combination of specific brands, a list of containers, and a range of sizes. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +import pyarrow as pa +from datafusion import SessionContext, col, lit, udf, functions as F + +items_of_interest = { + "Brand#12": { + "min_quantity": 1, + "containers": ["SM CASE", "SM BOX", "SM PACK", "SM PKG"], + "max_size": 5, + }, + "Brand#23": { + "min_quantity": 10, + "containers": ["MED BAG", "MED BOX", "MED PKG", "MED PACK"], + "max_size": 10, + }, + "Brand#34": { + "min_quantity": 20, + "containers": ["LG CASE", "LG BOX", "LG PACK", "LG PKG"], + "max_size": 15, + }, +} + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns( + "p_partkey", "p_brand", "p_container", "p_size" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_partkey", + "l_quantity", + "l_shipmode", + "l_shipinstruct", + "l_extendedprice", + "l_discount", +) + +# These limitations apply to all line items, so go ahead and do them first + +df = df_lineitem.filter(col("l_shipinstruct") == lit("DELIVER IN PERSON")) + +# Small note: The data generated uses "REG AIR" but the spec says "AIR REG" +df = df.filter( + (col("l_shipmode") == lit("AIR")) | (col("l_shipmode") == lit("REG AIR")) +) + +df = df.join(df_part, (["l_partkey"], ["p_partkey"]), "inner") + + +# Create the user defined function (UDF) definition that does the work +def is_of_interest( + brand_arr: pa.Array, + container_arr: pa.Array, + quantity_arr: pa.Array, + size_arr: pa.Array, +) -> pa.Array: + """ + The purpose of this function is to demonstrate how a UDF works, taking as input a pyarrow Array + and generating a resultant Array. The length of the inputs should match and there should be the + same number of rows in the output. + """ + result = [] + for idx, brand in enumerate(brand_arr): + brand = brand.as_py() + if brand in items_of_interest: + values_of_interest = items_of_interest[brand] + + container_matches = ( + container_arr[idx].as_py() in values_of_interest["containers"] + ) + + quantity = quantity_arr[idx].as_py() + quantity_matches = ( + values_of_interest["min_quantity"] + <= quantity + <= values_of_interest["min_quantity"] + 10 + ) + + size = size_arr[idx].as_py() + size_matches = 1 <= size <= values_of_interest["max_size"] + + result.append(container_matches and quantity_matches and size_matches) + else: + result.append(False) + + return pa.array(result) + + +# Turn the above function into a UDF that DataFusion can understand +is_of_interest_udf = udf( + is_of_interest, + [pa.utf8(), pa.utf8(), pa.float32(), pa.int32()], + pa.bool_(), + "stable", +) + +# Filter results using the above UDF +df = df.filter( + is_of_interest_udf( + col("p_brand"), col("p_container"), col("l_quantity"), col("p_size") + ) +) + +df = df.aggregate( + [], + [F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias("revenue")], +) + +df.show() diff --git a/examples/tpch/q20_potential_part_promotion.py b/examples/tpch/q20_potential_part_promotion.py new file mode 100644 index 00000000..09686db0 --- /dev/null +++ b/examples/tpch/q20_potential_part_promotion.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 20: + +The Potential Part Promotion query identifies suppliers who have an excess of a given part +available; an excess is defined to be more than 50% of the parts like the given part that the +supplier shipped in a given year for a given nation. Only parts whose names share a certain naming +convention are considered. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datetime import datetime +import pyarrow as pa +from datafusion import SessionContext, col, lit, functions as F + +COLOR_OF_INTEREST = "forest" +DATE_OF_INTEREST = "1994-01-01" +NATION_OF_INTEREST = "CANADA" + +# Load the dataframes we need + +ctx = SessionContext() + +df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", "p_name") +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_shipdate", "l_partkey", "l_suppkey", "l_quantity" +) +df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns( + "ps_partkey", "ps_suppkey", "ps_availqty" +) +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_address", "s_name", "s_nationkey" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name" +) + +date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date() + +# Note: this is a hack on setting the values. It should be set differently once +# https://github.com/apache/datafusion-python/issues/665 is resolved. +interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval()) + +# Filter down dataframes +df_nation = df_nation.filter(col("n_name") == lit(NATION_OF_INTEREST)) +df_part = df_part.filter( + F.substr(col("p_name"), lit(0), lit(len(COLOR_OF_INTEREST) + 1)) + == lit(COLOR_OF_INTEREST) +) + +df = df_lineitem.filter(col("l_shipdate") >= lit(date)).filter( + col("l_shipdate") < lit(date) + lit(interval) +) + +# This will filter down the line items to the parts of interest +df = df.join(df_part, (["l_partkey"], ["p_partkey"]), "inner") + +# Compute the total sold and limit ourselves to indivdual supplier/part combinations +df = df.aggregate( + [col("l_partkey"), col("l_suppkey")], [F.sum(col("l_quantity")).alias("total_sold")] +) + +df = df.join( + df_partsupp, (["l_partkey", "l_suppkey"], ["ps_partkey", "ps_suppkey"]), "inner" +) + +# Find cases of excess quantity +df.filter(col("ps_availqty") > lit(0.5) * col("total_sold")) + +# We could do these joins earlier, but now limit to the nation of interest suppliers +df = df.join(df_supplier, (["ps_suppkey"], ["s_suppkey"]), "inner") +df = df.join(df_nation, (["s_nationkey"], ["n_nationkey"]), "inner") + +# Restrict to the requested data per the problem statement +df = df.select_columns("s_name", "s_address") + +df = df.sort(col("s_name").sort()) + +df.show() diff --git a/examples/tpch/q21_suppliers_kept_orders_waiting.py b/examples/tpch/q21_suppliers_kept_orders_waiting.py new file mode 100644 index 00000000..2f58d6e7 --- /dev/null +++ b/examples/tpch/q21_suppliers_kept_orders_waiting.py @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 21: + +The Suppliers Who Kept Orders Waiting query identifies suppliers, for a given nation, whose product +was part of a multi-supplier order (with current status of 'F') where they were the only supplier +who failed to meet the committed delivery date. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, col, lit, functions as F + +NATION_OF_INTEREST = "SAUDI ARABIA" + +# Load the dataframes we need + +ctx = SessionContext() + +df_orders = ctx.read_parquet("data/orders.parquet").select_columns( + "o_orderkey", "o_orderstatus" +) +df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns( + "l_orderkey", "l_receiptdate", "l_commitdate", "l_suppkey" +) +df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns( + "s_suppkey", "s_name", "s_nationkey" +) +df_nation = ctx.read_parquet("data/nation.parquet").select_columns( + "n_nationkey", "n_name" +) + +# Limit to suppliers in the nation of interest +df_suppliers_of_interest = df_nation.filter(col("n_name") == lit(NATION_OF_INTEREST)) + +df_suppliers_of_interest = df_suppliers_of_interest.join( + df_supplier, (["n_nationkey"], ["s_nationkey"]), "inner" +) + +# Find the failed orders and all their line items +df = df_orders.filter(col("o_orderstatus") == lit("F")) + +df = df_lineitem.join(df, (["l_orderkey"], ["o_orderkey"]), "inner") + +# Identify the line items for which the order is failed due to. +df = df.with_column( + "failed_supp", + F.case(col("l_receiptdate") > col("l_commitdate")) + .when(lit(True), col("l_suppkey")) + .end(), +) + +# There are other ways we could do this but the purpose of this example is to work with rows where +# an element is an array of values. In this case, we will create two columns of arrays. One will be +# an array of all of the suppliers who made up this order. That way we can filter the dataframe for +# only orders where this array is larger than one for multiple supplier orders. The second column +# is all of the suppliers who failed to make their commitment. We can filter the second column for +# arrays with size one. That combination will give us orders that had multiple suppliers where only +# one failed. Use distinct=True in the blow aggregation so we don't get multipe line items from the +# same supplier reported in either array. +df = df.aggregate( + [col("o_orderkey")], + [ + F.array_agg(col("l_suppkey"), distinct=True).alias("all_suppliers"), + F.array_agg(col("failed_supp"), distinct=True).alias("failed_suppliers"), + ], +) + +# Remove the null entries that will get returned by array_agg so we can test to see where we only +# have a single failed supplier in a multiple supplier order +df = df.with_column( + "failed_suppliers", F.array_remove(col("failed_suppliers"), lit(None)) +) + +# This is the check described above which will identify single failed supplier in a multiple +# supplier order. +df = df.filter(F.array_length(col("failed_suppliers")) == lit(1)).filter( + F.array_length(col("all_suppliers")) > lit(1) +) + +# Since we have an array we know is exactly one element long, we can extract that single value. +df = df.select( + col("o_orderkey"), F.array_element(col("failed_suppliers"), lit(1)).alias("suppkey") +) + +# Join to the supplier of interest list for the nation of interest +df = df.join(df_suppliers_of_interest, (["suppkey"], ["s_suppkey"]), "inner") + +# Count how many orders that supplier is the only failed supplier for +df = df.aggregate([col("s_name")], [F.count(col("o_orderkey")).alias("numwait")]) + +# Return in descending order +df = df.sort(col("numwait").sort(ascending=False)) + +df = df.limit(100) + +df.show() diff --git a/examples/tpch/q22_global_sales_opportunity.py b/examples/tpch/q22_global_sales_opportunity.py new file mode 100644 index 00000000..d2d0c5a0 --- /dev/null +++ b/examples/tpch/q22_global_sales_opportunity.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H Problem Statement Query 22: + +This query counts how many customers within a specific range of country codes have not placed +orders for 7 years but who have a greater than average “positive” account balance. It also reflects +the magnitude of that balance. Country code is defined as the first two characters of c_phone. + +The above problem statement text is copyrighted by the Transaction Processing Performance Council +as part of their TPC Benchmark H Specification revision 2.18.0. +""" + +from datafusion import SessionContext, WindowFrame, col, lit, functions as F + +NATION_CODE = 13 + +# Load the dataframes we need + +ctx = SessionContext() + +df_customer = ctx.read_parquet("data/customer.parquet").select_columns( + "c_phone", "c_acctbal", "c_custkey" +) +df_orders = ctx.read_parquet("data/orders.parquet").select_columns("o_custkey") + +# The nation code is a two digit number, but we need to convert it to a string literal +nation_code = lit(str(NATION_CODE)) + +# Use the substring operation to extract the first two charaters of the phone number +df = df_customer.with_column("cntrycode", F.substr(col("c_phone"), lit(0), lit(3))) + +# Limit our search to customers with some balance and in the country code above +df = df.filter(col("c_acctbal") > lit(0.0)) +df = df.filter(nation_code == col("cntrycode")) + +# Compute the average balance. By default, the window frame is from unbounded preceeding to the +# current row. We want our frame to cover the entire data frame. +window_frame = WindowFrame("rows", None, None) +df = df.with_column( + "avg_balance", F.window("avg", [col("c_acctbal")], window_frame=window_frame) +) + +# Limit results to customers with above average balance +df = df.filter(col("c_acctbal") > col("avg_balance")) + +# Limit results to customers with no orders +df = df.join(df_orders, (["c_custkey"], ["o_custkey"]), "anti") + +# Count up the customers and the balances +df = df.aggregate( + [col("cntrycode")], + [ + F.count(col("c_custkey")).alias("numcust"), + F.sum(col("c_acctbal")).alias("totacctbal"), + ], +) + +df = df.sort(col("cntrycode").sort()) + +df.show()