Skip to content

Commit

Permalink
Enable configurable display of partition sizes in the explain stateme…
Browse files Browse the repository at this point in the history
…nt (apache#9474)

* quick fixes

Signed-off-by: jayzhan211 <[email protected]>

* remove one in load

Signed-off-by: jayzhan211 <[email protected]>

* fix others

Signed-off-by: jayzhan211 <[email protected]>

* revert

Signed-off-by: jayzhan211 <[email protected]>

* config

Signed-off-by: jayzhan211 <[email protected]>

* with_show_sizes

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Mar 6, 2024
1 parent ea01e56 commit cc8a41a
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 11 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ config_namespace! {
/// When set to true, the explain statement will print operator statistics
/// for physical plans
pub show_statistics: bool, default = false

/// When set to true, the explain statement will print the partition sizes
pub show_sizes: bool, default = true
}
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ impl TableProvider for MemTable {
let inner_vec = arc_inner_vec.read().await;
partitions.push(inner_vec.clone())
}

let mut exec =
MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;

let show_sizes = state.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);

// add sort information if present
let sort_order = self.sort_order.lock();
if !sort_order.is_empty() {
Expand Down
23 changes: 18 additions & 5 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct MemoryExec {
// Sort information: one or more equivalent orderings
sort_information: Vec<LexOrdering>,
cache: PlanProperties,
/// if partition sizes should be displayed
show_sizes: bool,
}

impl fmt::Debug for MemoryExec {
Expand Down Expand Up @@ -85,11 +87,15 @@ impl DisplayAs for MemoryExec {
})
.unwrap_or_default();

write!(
f,
"MemoryExec: partitions={}, partition_sizes={partition_sizes:?}{output_ordering}",
partition_sizes.len(),
)
if self.show_sizes {
write!(
f,
"MemoryExec: partitions={}, partition_sizes={partition_sizes:?}{output_ordering}",
partition_sizes.len(),
)
} else {
write!(f, "MemoryExec: partitions={}", partition_sizes.len(),)
}
}
}
}
Expand Down Expand Up @@ -161,9 +167,16 @@ impl MemoryExec {
projection,
sort_information: vec![],
cache,
show_sizes: true,
})
}

/// set `show_sizes` to determine whether to display partition sizes
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
self
}

pub fn partitions(&self) -> &[Vec<RecordBatch>] {
&self.partitions
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/arrow_typeof.slt
Original file line number Diff line number Diff line change
Expand Up @@ -421,4 +421,4 @@ FixedSizeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0
query ?
select arrow_cast([1, 2, 3], 'FixedSizeList(3, Int64)');
----
[1, 2, 3]
[1, 2, 3]
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/clickbench.slt
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,5 @@ query PI
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;
----

query
drop table hits;
statement ok
drop table hits;
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ datafusion.execution.target_partitions 7
datafusion.execution.time_zone +00:00
datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
datafusion.explain.show_sizes true
datafusion.explain.show_statistics false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.default_filter_selectivity 20
Expand Down Expand Up @@ -281,6 +282,7 @@ datafusion.execution.target_partitions 7 Number of partitions for query executio
datafusion.execution.time_zone +00:00 The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour
datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
datafusion.explain.show_sizes true When set to true, the explain statement will print the partition sizes
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
Expand Down
10 changes: 8 additions & 2 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 WHERE a > 3 LIMIT 3 OFFSET 6);
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--Projection:
--Projection:
----Limit: skip=6, fetch=3
------Filter: t1.a > Int32(3)
--------TableScan: t1 projection=[a]
Expand All @@ -387,6 +387,9 @@ CREATE TABLE t1000 (i BIGINT) AS
WITH t AS (VALUES (0), (0), (0), (0), (0), (0), (0), (0), (0), (0))
SELECT ROW_NUMBER() OVER (PARTITION BY t1.column1) FROM t t1, t t2, t t3;

statement ok
set datafusion.explain.show_sizes = false;

# verify that there are multiple partitions in the input (i.e. MemoryExec says
# there are 4 partitions) so that this tests multi-partition limit.
query TT
Expand All @@ -400,7 +403,10 @@ AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[]
--CoalesceBatchesExec: target_batch_size=8192
----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4
------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[]
--------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1]
--------MemoryExec: partitions=4

statement ok
set datafusion.explain.show_sizes = true;

query I
SELECT i FROM t1000 ORDER BY i DESC LIMIT 3;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1582,4 +1582,4 @@ select a from t;
----

statement ok
set datafusion.optimizer.max_passes=3;
set datafusion.optimizer.max_passes=3;
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
| datafusion.explain.show_sizes | true | When set to true, the explain statement will print the partition sizes |
| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type |
| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) |
| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. |

0 comments on commit cc8a41a

Please sign in to comment.