Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Change Parquet file splitting logic to split all files #3454

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Nov 29, 2024

  1. Performs splitting of Parquet files on all files instead of just the first N
  2. Use ScanTask::scantask_estimated_size_bytes as the metric for splitting instead of the DataSource's size on disk
  3. Refactors the code to use an accumulator struct instead

Changes in logic highlighted as PR comments.

I think this will actually result in performance regressions in many cases, as planning will take longer for cases where we think we need to split the ScanTask. Especially for something like a .show() of a really large dataset. Should do some benchmarking to see how this affects various workloads.

TODO:

  • Perform bulk downloads of Parquet metadata instead of one-at-a-time

@github-actions github-actions bot added the enhancement New feature or request label Nov 29, 2024
Copy link

codspeed-hq bot commented Nov 29, 2024

CodSpeed Performance Report

Merging #3454 will not alter performance

Comparing jay/split-all-files (65d6e2b) with main (a16a045)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Nov 30, 2024

Codecov Report

Attention: Patch coverage is 96.22642% with 6 lines in your changes missing coverage. Please review.

Project coverage is 77.51%. Comparing base (794a4fd) to head (65d6e2b).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-scan/src/scan_task_iters.rs 96.22% 6 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3454      +/-   ##
==========================================
+ Coverage   77.49%   77.51%   +0.01%     
==========================================
  Files         687      687              
  Lines       84475    84545      +70     
==========================================
+ Hits        65464    65531      +67     
- Misses      19011    19014       +3     
Files with missing lines Coverage Δ
src/daft-scan/src/scan_task_iters.rs 96.20% <96.22%> (-0.73%) ⬇️

... and 5 files with indirect coverage changes

[source] => source,
_ => unreachable!(
"SplitByRowGroupsAccumulator should only have one DataSource in its ScanTask"
),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a fairly odd constraint that I inherited from the previous logic. I think we should be able to also split ScanTasks with more than 1 DataSource, but not a priority atm.

self.num_rows += rg.num_rows();
self.row_group_indices.push(*rg_idx);

// Flush the accumulator if necessary
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has been changed from the past:

  1. We use ScanTask::estimated_size_bytes when adding to the self.size_bytes accumulator, instead of the size of the file on disk. This should give us a "more accurate" sizing than before, since it should account for things such as column pruning (ideally).

  2. When ScanTask::estimated_size_bytes is not provided, we just always flush (i.e. every rowgroup becomes its own ScanTask). Alternatively, we can just avoid splitting at all. Not sure what the intended behavior there should be.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant