Skip to content

Commit

Permalink
fix: Limit together with pushdown_filters (apache#13788)
Browse files Browse the repository at this point in the history
* fix: Limit together with pushdown_filters

* Fix format

* Address new comments

* Fix testing case to hit the problem
  • Loading branch information
zhuqi-lucas authored Dec 16, 2024
1 parent 59410ea commit fe53eaf
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,13 @@ impl TableProvider for ListingTable {
});
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();

// We should not limit the number of partitioned files to scan if there are filters and limit
// at the same time. This is because the limit should be applied after the filters are applied.
let statistic_file_limit = if filters.is_empty() { limit } else { None };

let (mut partitioned_file_lists, statistics) = self
.list_files_for_scan(session_state, &partition_filters, limit)
.list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
.await?;

// if no files need to be read, return an `EmptyExec`
Expand Down
73 changes: 73 additions & 0 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,76 @@ logical_plan

statement ok
drop table d;


# Test push down filter with limit for parquet
statement ok
set datafusion.execution.parquet.pushdown_filters = true;

# this one is also required to make DF skip second file due to "sufficient" amount of rows
statement ok
set datafusion.execution.collect_statistics = true;

# Create a table as a data source
statement ok
CREATE TABLE src_table (
part_key INT,
value INT
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);


# There will be more than 2 records filtered from the table to check that `limit 1` actually applied.
# Setup 3 files, i.e., as many as there are partitions:

# File 1:
query I
COPY (SELECT * FROM src_table where part_key = 1)
TO 'test_files/scratch/parquet/test_filter_with_limit/part-0.parquet'
STORED AS PARQUET;
----
3

# File 2:
query I
COPY (SELECT * FROM src_table where part_key = 2)
TO 'test_files/scratch/parquet/test_filter_with_limit/part-1.parquet'
STORED AS PARQUET;
----
4

# File 3:
query I
COPY (SELECT * FROM src_table where part_key = 3)
TO 'test_files/scratch/parquet/test_filter_with_limit/part-2.parquet'
STORED AS PARQUET;
----
3

statement ok
CREATE EXTERNAL TABLE test_filter_with_limit
(
part_key INT,
value INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_filter_with_limit/';

query TT
explain select * from test_filter_with_limit where value = 2 limit 1;
----
logical_plan
01)Limit: skip=0, fetch=1
02)--TableScan: test_filter_with_limit projection=[part_key, value], full_filters=[test_filter_with_limit.value = Int32(2)], fetch=1

query II
select * from test_filter_with_limit where value = 2 limit 1;
----
2 2

# Tear down test_filter_with_limit table:
statement ok
DROP TABLE test_filter_with_limit;

# Tear down src_table table:
statement ok
DROP TABLE src_table;

0 comments on commit fe53eaf

Please sign in to comment.