Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory reservation & metrics for cross join #5339

Merged
merged 3 commits into from
Feb 28, 2023

Conversation

korowa
Copy link
Contributor

@korowa korowa commented Feb 19, 2023

Which issue does this PR close?

Closes #5162.
Part of #5220.

Rationale for this change

Memory management for 1/4 Join operators

What changes are included in this PR?

  • CrossJoinExec.reservation -- the actual reservation, used in load_left_input closure to reserve memory for build-side data, and in stream polling function to free memory after probe-side stream has been exhausted. Another point where memory gets freed is a destructor of reservation (i.e. in case when parent operator has LIMIT), so it's not required to store reservation as a stream attribute, but explicitly calling free seems like more correct way
  • BuildProbeJoinMetrics -- at this moment CrossJoinExec lacks metrics which could be exposed in explain analyze -- the counters from CrossJoinStream has been moved to BuildProbeJoinMetrics structure -- it could also be used in followups for both NestedLoopJoinExec and HashJoinExec as they all share almost the same "build & probe" idea, so their metrics could be similar.

Are these changes tested?

Test cases for normal cross join execution and asserting error in case of overallocation attempt have been added

Are there any user-facing changes?

Not really, but should help to avoid OOM and fail with Resources exhausted error.

@github-actions github-actions bot added the core Core DataFusion crate label Feb 19, 2023
@alamb
Copy link
Contributor

alamb commented Feb 26, 2023

Thank you @korowa -- I plan to review this carefully tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @korowa -- this looks great

I think we should also add sql level test for the memory limit code in; I think it should be easy to add one to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs. I am happy to add such a test if you prefer as a follow on PR.

I also tried the metrics out in datafusion-cli and it looks good to me

❯ create or replace table t as select column1 as value, column2 as time from (select * from (values
  (1, timestamp '2022-01-01 00:00:30'),
  (2, timestamp '2022-01-01 01:00:10'),
  (3, timestamp '2022-01-02 00:00:20')
) as sq) as sq;
0 rows in set. Query took 0.006 seconds.

❯ explain analyze select * from t t1 CROSS JOIN (select * from t where value < 3) as t2;
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                 |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=6, elapsed_compute=53.015µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                |
|                   |   ProjectionExec: expr=[value@0 as value, time@1 as time, value@2 as value, time@3 as time], metrics=[output_rows=6, elapsed_compute=10.477µs, spill_count=0, spilled_bytes=0, mem_used=0]           |
|                   |     CrossJoinExec, metrics=[output_rows=6, input_rows=6, build_input_rows=3, input_batches=1, build_input_batches=1, output_batches=3, build_mem_used=496, build_time=107.605µs, join_time=64.668µs] |
|                   |       MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                      |
|                   |       ProjectionExec: expr=[value@0 as value, time@1 as time], metrics=[output_rows=2, elapsed_compute=6.424µs, spill_count=0, spilled_bytes=0, mem_used=0]                                          |
|                   |         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=69.635µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                   |
|                   |           FilterExec: value@0 < 3, metrics=[output_rows=2, elapsed_compute=147.686µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                    |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1, metrics=[fetch_time=15.452µs, repart_time=1ns, send_time=9.12µs]                                                  |
|                   |               MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                              |
|                   |                                                                                                                                                                                                      |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.lock().try_grow(batch_size)?;
// Update metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@@ -434,15 +453,7 @@ impl CrossJoinStream {
Some(result)
}
other => {
debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a nice improvement that the metrics are now included in things like EXPLAIN ANALYZE

@korowa
Copy link
Contributor Author

korowa commented Feb 28, 2023

Thank you for the review @alamb!

I think we should also add sql level test for the memory limit code in; I think it should be easy to add one to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs. I am happy to add such a test if you prefer as a follow on PR.

Seems like new test_overallocation is sort of duplicate of memory_limit tests -- so, I guess, it'll be better to add new memory_limit test instead of existing one in this PR

UPD: memory_limit tests depend on planner/optimizer/etc behaviour so simply adding test in addition to existing (or just added) CrossJoinExec tests, as you've proposed, seems to be the most proper option. I've also modified test_overallocation to get rid of hardcoded record batch size in error message.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @korowa

@alamb alamb merged commit ea3b965 into apache:main Feb 28, 2023
@ursabot
Copy link

ursabot commented Feb 28, 2023

Benchmark runs are scheduled for baseline = 20d08ab and contender = ea3b965. ea3b965 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ignoring of memory-pool limits & OOM on large cartesian-product join
3 participants