Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements in HashAggregationExec when spilling #7858

Closed
milenkovicm opened this issue Oct 18, 2023 · 21 comments
Closed

Improvements in HashAggregationExec when spilling #7858

milenkovicm opened this issue Oct 18, 2023 · 21 comments
Labels
enhancement New feature or request

Comments

@milenkovicm
Copy link
Contributor

milenkovicm commented Oct 18, 2023

Is your feature request related to a problem or challenge?

I'd like to share few observations after putting hash aggregation ( #7400) on a stress test. Datafusion v32 used.

Before I start, I apologise if I did not get something correctly, aggregation code has changed a lot since last time I had a look.

I've created a test case which should put under pressure hash aggregation, forcing it to spill, so I can observe query behaviour. Data contains about 250M rows, with 50M unique uids, which are going to be used to as aggregation key. It is around 3GB parquet file.

Memory pool is configured as follows:

let runtime_config = RuntimeConfig::new()
        .with_memory_pool(Arc::new(FairSpillPool::new(2_000_000_000)))

with 4 target partitions.

Query is rather simple:

select count(*), sum(id), sum(co), sum(n), uid from ta group by uid 

I was expecting that query will not fail with ResourcesExhausted as aggregation would spill if under memory pressure, this was not the case.

Describe the solution you'd like

Few low hanging fruits which can be addressed:

  1. sort_batch copies data, if I'm not mistaken. As spill is usually triggered under memory pressure, in most cases for all partitions around same time, it effectively doubles memory needed (in most cases I've observed ~2.5x more memory used than set up for memory pool).

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L672

  1. Spill writes whole state as single batch, which is problem later when we try to merge all those files,

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L676

  1. Not sure whats the reason for checks at:

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L591

and

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L699

Available improvements, IMHO:

  1. before spill, we could split batch into smaller blocks, sort those smaller blocks and write spill file per block, at the moment we write single file. Not sure what would be strategy for splitting batch into smaller blocks, we should take into account not to have to many open files as well.

  2. Write more than one batch per spill

fn spill(&mut self) -> Result<()> {
        let emit = self.emit(EmitTo::All, true)?;
        let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
        let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
        let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
        // TODO: slice large `sorted` and write to multiple files in parallel
        let mut offset = 0;
        let total_rows = sorted.num_rows();

        while offset < total_rows {
            let length = std::cmp::min(total_rows - offset, self.batch_size);
            let batch = sorted.slice(offset, length);
            offset += batch.num_rows();
            writer.write(&batch)?;
        }

        writer.finish()?;
        self.spill_state.spills.push(spillfile);
        Ok(())
    }
  1. can we remove if at ?

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L591

and change if at

https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L699

to self.group_values.len() > 0

it would make more sense to "send" smaller batch than fail with ResourcesExhausted

Describe alternatives you've considered

No other alternatives considered at the moment

Additional context

I have disabled (commented out) resource accounting in RepartitionExec as it would be the first one to freak out with ResourcesExhausted. From what I observed, RepartitionExec would hold memory for a few batches of data when it raises ResourcesExhausted. Change made in #4816 make sense, but in my test they were the first one to give up, before aggregation spill can occur.

@milenkovicm milenkovicm added the enhancement New feature or request label Oct 18, 2023
@milenkovicm
Copy link
Contributor Author

Maybe @kazuyukitanimura can help me with this issue

@alamb
Copy link
Contributor

alamb commented Oct 18, 2023

Other potential "strategies" to avoid the memory overshoots might be

reserve 2x the memory needed for the hash table, to account for sorting on spill,

Pros: ensures the memory budget is preserved

Cons: will cause more smaller spill files, and will cause some queries to spill even if they could have fit entirely in memory (more than half)

write unsorted data to spill file

The process would look like

  1. Write (unsorted) batches to disk
  2. when all baches exist, read in each spill file (maybe in 2 parts) and rewrite newly sorted files
  3. do the final merge with the sorted files

Pros: ensures we don't spill unless the memory reservation is actually exhausted

Cons: Each row is now read/rewritten twice rather than just once

@kazuyukitanimura
Copy link
Contributor

Thank you @milenkovicm for stress-testing and the improvement ideas.
Here are my quick thoughts;

For 1 & 2, I think we can break down the emit into smaller chunks even before sort_batch(). We can flush sorted chunks to disk before start sorting the next groups of unsorted chunks. In that way, we can reduce (control) the memory footprint for sorting. We still need but a smaller budget for sorting. This is somewhat a mixed solution of @alamb 's two strategies.

By this comment
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L675
the plan was to break down a large chunk and writing in parallel (maybe pooling) so that single thread disk writing won't be blocking.

For 3, if ResourcesExhausted happens but group_values.len() is shorter than self.batch_size, that means even a single batch worth of data did not fit in memory. In that case, we need to stop proceeding from that point. The assumption is to have a memory conf to hold at least one batch in memory. Alternatively batch_size can be reduced. I think we still need these checks even we spill smaller data at a time.

Cheers!

@milenkovicm
Copy link
Contributor Author

Thanks @kazuyukitanimura , @alamb for your comments,

I guess there is no perfect way to address 1.

Another alternative for 1 might be to sort data in GroupValuesRows.
We could crate a SortedGroupValuesRows which would have BTreeMap<GroupingKeyBlob, offset> instead of

https://github.com/apache/arrow-datafusion/blob/b6e4c8238031ae17373d9ae3be2def4b57645e42/datafusion/physical-plan/src/aggregates/group_values/row.rs#L47

this would make state sorted by key, thus no need for RecordBatch sorting later.

There should be conversion from GroupValuesRows to SortedGroupValuesRows which would involve copying just a single table entry from Map to BTree. This conversion (sorting) would be performed on the first spill, SortedGroupValuesRows would then converted to a batch and spilled. Once the first spill has been done AggregateExec should continue using SortedGroupValuesRows for its state, as we know at that points that keys needed to be sortedfor eventual spill or merge.

I get your idea regarding point 3, @kazuyukitanimura. I've tried to tune memory and batch size, what I observed that once the system is in border area this if will fail execution. It is quite indeterministic when it's going to fail.
I agree that spill should not produce a lot of small file, but in practice I believe small spill would be just a temporary, until one of the bigger consumer spills their state, and they are as well under memory pressure. Thus, believe removing this particular checks would improve robustness. wdyt?

@milenkovicm
Copy link
Contributor Author

SortMergeJoin has with_sort_spill_reservation_bytes maybe something similar could be added for HashAggregationExec

I find it very hard to reason about MemoryReservation

@alamb
Copy link
Contributor

alamb commented Oct 19, 2023

I find it very hard to reason about MemoryReservation

@milenkovicm is there anything we can do to help make it easier to reason about? Specifically maybe I can try and improve the documentation if you can explain what specifically you find hard to reason about

@milenkovicm
Copy link
Contributor Author

Thanks @alamb, leave it with me for now.

@kazuyukitanimura
Copy link
Contributor

I believe small spill would be just a temporary, until one of the bigger consumer spills their state, and they are as well under memory pressure. Thus, believe removing this particular checks would improve robustness. wdyt?

Thanks for the additional context @milenkovicm
Did you mean you observed that your stress-testing jobs are succeeding if those checks are removed?

@milenkovicm
Copy link
Contributor Author

Yes that is right. I don't see avalanche of small spills and system does not get out of memory.

Will try to put it under more test, and capture spill sizes

@milenkovicm
Copy link
Contributor Author

milenkovicm commented Oct 24, 2023

I've done some more testing with spilling and as I mentioned in my previous message I don't see "small" spills.

RepartitionExec is another operator which is complaining about memory consumption. To drill down to the problem I have changed RepartitionExec to print warning instead of panicking when over given memory threshold.

If we take simple physical plan:

Optimized physical plan:
  ProjectionExec: expr=[COUNT(*)@1 as COUNT(*), SUM(ta.id)@2 as SUM(ta.id), SUM(ta.co)@3 as SUM(ta.co), SUM(ta.n)@4 as SUM(ta.n), uid@0 as uid]
    GlobalLimitExec: skip=0, fetch=10
      CoalescePartitionsExec
        AggregateExec: mode=FinalPartitioned, gby=[uid@0 as uid], aggr=[COUNT(*), SUM(ta.id), SUM(ta.co), SUM(ta.n)]
          CoalesceBatchesExec: target_batch_size=8192
            RepartitionExec: partitioning=Hash([uid@0], 4), input_partitions=4
              AggregateExec: mode=Partial, gby=[uid@2 as uid], aggr=[COUNT(*), SUM(ta.id), SUM(ta.co), SUM(ta.n)]
                ParquetExec: file_groups={4 groups: [[ ... ]]}, projection=[id, co, uid, n]

I have noticed that RepartitionExec memory warnings can be correlated to spilling in AggregateExec: mode=FinalPartitioned.

My guess is that unbounded channel in RepartitionExec acts as a buffer during AggregateExec spill.
Which bears another question what would be optimal way of handling memory reservation in RepartitionExec or back pressure propagation between operators.

Maybe blocking until there is some free space is available would make sense instead of aggressively returning out of memory error, but not quite sure how would be implemented.

@alamb
Copy link
Contributor

alamb commented Oct 24, 2023

I think another issue is that the RepartitionExec reserves memory on demand, so if memory is almost out (as it has been consumed by the AggregateExec) it may not be able to get even a small amount.

The way to get the query to run is to restrict the memory used by the AggregateExec so there is still reasonable memory available for other operators

One potential thing would be to use a different memory strategy (e.g. a Fair pool)

@milenkovicm
Copy link
Contributor Author

FairPool has been used in all tests. It does not matter if memory is increased or decreased, RepartitionExec will eventually complain about memory allocation

@milenkovicm
Copy link
Contributor Author

Memory management is fun, still looking at it :)

Two questions, first one

should MemoryConsumer for GroupedHashAggregateStream be with_can_spill=true ?

https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/aggregates/row_hash.rs#L331

be:

let reservation = MemoryConsumer::new(name).with_can_spill(true).register(context.memory_pool());

Second question, can MemoryReservation expose consumer? Reason being is that way we can create useful MemoryManager implementations which are not of datafusion crate

impl MemoryReservation {
    
    pub fn consumer(&self) -> &MemoryConsumer {
        &self.registration.consumer
    }
}

@kazuyukitanimura
Copy link
Contributor

kazuyukitanimura commented Oct 27, 2023

should MemoryConsumer for GroupedHashAggregateStream be with_can_spill=true ?

I think this is a good point

@milenkovicm
Copy link
Contributor Author

should MemoryConsumer for GroupedHashAggregateStream be with_can_spill=true ?

I think this is a good point

It might explain why Fair pool is not behaving as expected, but I can not explain why I needed so much time to spot it 😀

@milenkovicm
Copy link
Contributor Author

@alamb & @kazuyukitanimura
I have created three smaller MR which address some issues identified in this thread, please have a look if you have time

@milenkovicm
Copy link
Contributor Author

Hi @alamb & @kazuyukitanimura,
I'd like to close this issue spill works as expected now with FairPool, but before I do I'd like to check with you if we want to create new issue which will capture future work which would address unaccounted sort memory usage?

@alamb
Copy link
Contributor

alamb commented Nov 20, 2023

Hi @alamb & @kazuyukitanimura, I'd like to close this issue spill works as expected now with FairPool, but before I do I'd like to check with you if we want to create new issue which will capture future work which would address unaccounted sort memory usage?

I think tracking outstanding bugs / known potential deficiencies would be a good idea. Thank you for your diligence @milenkovicm

@kazuyukitanimura
Copy link
Contributor

Thank you @milenkovicm I agree that we can create future work tickets and close this one.

@milenkovicm
Copy link
Contributor Author

as per our discussion, closing this task as I've captured leftovers in #8428
thanks @kazuyukitanimura and @alamb

@alamb
Copy link
Contributor

alamb commented Dec 5, 2023

Thank you @milenkovicm ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants