Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spilling in SortMergeJoin #9359

Closed
Tracked by #9846
viirya opened this issue Feb 27, 2024 · 13 comments · Fixed by #11218
Closed
Tracked by #9846

Add spilling in SortMergeJoin #9359

viirya opened this issue Feb 27, 2024 · 13 comments · Fixed by #11218
Labels
enhancement New feature or request

Comments

@viirya
Copy link
Member

viirya commented Feb 27, 2024

Is your feature request related to a problem or challenge?

In SortMergeJoin, it is possibly run of memory when it requires extra memory to hold polled buffer batches. We can consider adding spilling support there to make the operator resilient to the memory issue.

Describe the solution you'd like

Add spilling support in SortMergeJoin.

Describe alternatives you've considered

No response

Additional context

No response

@Omega359
Copy link
Contributor

Possibly related: #8398, #9170, #5108, #1599

@comphead
Copy link
Contributor

comphead commented Apr 2, 2024

After reading some code and already opened issues on the same topic, probably its possible to summarize whats needed for POC at least:

  • try to use external mem sorter to sort streamed and buffered batches for the sort phases
  • MemoryReservation is already in SMJ impl and for merge phase it uses the try_grow on buffered, so naive approach is to spill buffered and read back lesser chunks from disk, we can play with that
  • run test query and try to profile memory to see other places where spill can be useful

@comphead
Copy link
Contributor

comphead commented Apr 2, 2024

I'll try to start with creating a test query that fails on mem

@comphead
Copy link
Contributor

comphead commented Apr 5, 2024

Related to #9846

@comphead
Copy link
Contributor

comphead commented Apr 7, 2024

I think I managed to find the test query.
First observation is SMJ using external sorter, but the spill doesn't work, will dig into it.

Resources exhausted: Failed to allocate additional 1048576 bytes for ExternalSorterMerge[0] with 0 bytes already allocated - maximum available is 1043520

This gives me a suggestion ExternalSorter used but spill is not used

Playing with sort params I managed to make SMJ merge phase complain on mem exhaustion

./datafusion-cli/target/debug/datafusion-cli -m 1m
DataFusion CLI v37.0.0
❯ set datafusion.optimizer.prefer_hash_join = false;
0 row(s) fetched. 
Elapsed 0.003 seconds.

❯ set datafusion.execution.sort_in_place_threshold_bytes = 104857;
0 row(s) fetched. 
Elapsed 0.001 seconds.

❯ set datafusion.execution.sort_spill_reservation_bytes = 104857;
0 row(s) fetched. 
Elapsed 0.000 seconds.

❯ select * from (select unnest(range(0, 10000)) id) t inner join (select unnest(range(0, 100000)) id) t1 on t.id = t1.id;
Resources exhausted: Failed to allocate additional 165208 bytes for SMJStream[5] with 0 bytes already allocated - maximum available is 65456

@comphead
Copy link
Contributor

comphead commented Apr 7, 2024

My next steps

  • figure out why ExternalSorterMerge doesn't use spill
  • Do a spill in SMJ

@westonpace
Copy link
Member

I can reproduce this pretty reliably in lance if it's any help. I run SortExec on a column of 100 million strings (each ~30 bytes long) and I have a 100MiB fair pool and it triggers in about 5 minutes. Let me know if there is anything I can do to assist you.

@milenkovicm
Copy link
Contributor

milenkovicm commented Apr 12, 2024

FWIW, I believe I was looking at this problem some time ago, if I remember correctly the issue was that one of the memory consumer, presumably ExternalSorterMerge was returning all the memory it allocated just to ask for new allocation with similar size very soon after. MemoryConsumer would return all the memory when dropped.

My suspect is:

https://github.com/apache/arrow-datafusion/blob/a5cf0b8902ae55b81ac86b875c7e94cf1bdc205d/datafusion/physical-plan/src/sorts/sort.rs#L263

or

https://github.com/apache/arrow-datafusion/blob/a5cf0b8902ae55b81ac86b875c7e94cf1bdc205d/datafusion/physical-plan/src/sorts/sort.rs#L359

or somewhere around it.

line

Resources exhausted: Failed to allocate additional 165208 bytes for SMJStream[5] with 0 bytes already allocated - maximum available is 65456

may back my clain

I cant find more information looks like that branch is MIA

@comphead
Copy link
Contributor

Thanks, I'm planning to check next week if spilling works or not for ExternalSorter used for SMJ and see how it is possible to make spilling work as well for the merge phase, the memory pool is injected and SMJ get new allocations through try_grow so lets see what we can do

@comphead
Copy link
Contributor

comphead commented Jun 6, 2024

Finally starting on it. The test doesn't work anymore

@viirya
Copy link
Member Author

viirya commented Jun 6, 2024

Yes, I tried the above test few days ago but it doesn't work now.

@comphead
Copy link
Contributor

comphead commented Jun 6, 2024

I made it work that way (notice -m 1m)

 ./target/debug/datafusion-cli -m 1m
DataFusion CLI v38.0.0
> set datafusion.optimizer.prefer_hash_join = false;
0 row(s) fetched. 
Elapsed 0.011 seconds.

> set datafusion.execution.sort_spill_reservation_bytes = 104857;
0 row(s) fetched. 
Elapsed 0.002 seconds.

> set datafusion.execution.sort_in_place_threshold_bytes = 104857;
0 row(s) fetched. 
Elapsed 0.002 seconds.

> select * from (select unnest(range(0, 100000)) id) t inner join (select unnest(range(0, 100000)) id) t1 on t.id = t1.id;
Resources exhausted: Failed to allocate additional 164552 bytes for SMJStream[14] with 0 bytes already allocated - maximum available is 113800
> 

Weird the same thing as a test doesn't respect memory pool

    #[tokio::test]
    async fn test_smj_spill() -> Result<()> {
        let rt_config = datafusion_execution::runtime_env::RuntimeConfig::new();
        let rt_config = rt_config
            .with_memory_pool(Arc::new(datafusion_execution::memory_pool::GreedyMemoryPool::new(1048576)));
        let runtime_env = RuntimeEnv::new(rt_config);
        let session_config = SessionConfig::from_env()?.with_information_schema(true)
            .with_sort_spill_reservation_bytes(104857)
            .with_sort_in_place_threshold_bytes(104857);

        let ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime_env?));
        let sql = "set datafusion.optimizer.prefer_hash_join = false;";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "
select * from (select unnest(range(0, 100000)) id) t inner join (select unnest(range(0, 100000)) id) t1 on t.id = t1.id
        ";

        let actual = ctx.sql(sql).await?.collect().await?;
        println!("{}", actual.len());
        println!("=============================================================================");

        Ok(())
    }

@comphead
Copy link
Contributor

comphead commented Jun 7, 2024

the first use case is to try spilling for buffered data, as the buffered data comes in full size and eats the memory.
This is proved by query below

> select * from (select unnest(range(0, 1)) id) t inner join (select unnest(range(0, 100000)) id) t1 on t.id = t1.id;

UPD: Buffered data comes in by partitions, every partition gets processed sequentially. The flow is approx:

read partition
&batch.num_rows() = 3
try_grow memory
join output
join output
join output
shrink

read partition
&batch.num_rows() = 1
try_grow memory
join output
shrink

Looks like the spilling needed in 1 place only

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants