-
Notifications
You must be signed in to change notification settings - Fork 841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Selective decoding of a subset (e.g. columns or row groups) of parquet metadata #5855
Comments
A low-hanging fruit would be selectively converting thrift row groups to parquet-rs row groups. Currently, we build all row group metadata, which takes 30% of the decoding time (as will shown in the #5770 blog post). arrow-rs/parquet/src/file/footer.rs Lines 99 to 101 in 087f34b
If we can filter out the row group early based on the column stats, we can easily skip those row groups. But this requires changing the decode API, and potentially how we open a parquet file... which is not clear to me arrow-rs/parquet/src/file/footer.rs Line 54 in 087f34b
|
Nice -- I updated the title to reflect this |
I thought about an alternative (but similar) approach to Pinterest's solution -- instead of decoding and building in-memory structs along the way, we can decouple it and make it two passes. In the first pass, decode the thrift metadata but do not build the in-memory structures (i.e., no memory allocation, etc). Instead, we only track the location of the important structures. Specifically, instead of building the column chunk structs as we see it, we track the location of the column chunk (offset to the buffer). In the second pass -- when we actually need to read a column chunk -- we build the in-memory data structure on demand, using the offset tracked in the first pass. This approach has the advantage of selective decoding (faster, lower memory consumption, etc.) and does not need to change the decoding API (unlike the Pinterest approach). However, it is suboptimal if we actually need to decode the entire metadata, in which case the first pass is pure overhead. Presuming that machine learning workloads (or, in general, wide tables) present high selectivity, we should still save quite a lot. |
I think a good first step would be to analyse where the current overheads actually lie, the Pinterest article says the major benefit is avoiding allocations. Whilst #5777 doesn't avoid all allocations, it does eliminate the vast majority of them. If this isn't moving the needle, it might suggest the bottleneck is something different. IMO the two pass solution you describe is not hugely dissimilar from what that PR is doing. |
Below is the flamegraph of decoding parquet metadata and allocation itself does not show up as the bottleneck. The term "allocation" is ambiguous, it can refer to allocation operations, it can also mean excessive memory footprint (and the efforts to set them up), and I believe the later is the bottleneck. More formally, I believe the time is spent on two types of tasks: (1) decoding, interpreting the thrift data, which can be SIMD acclerated (2) setup the in-memory sturcture, i.e., inflating the 10MB metadata into 100MB of in-memory representation, which is solved by skipping columns/row groups. |
I'm curious how you intend to SIMD accelerate varint decoding, its a case study in being unfriendly to that... But optimising that aspect I would see as more obviously going to improve decode speed. Regarding the linked flamegraph I'm not sure I'm seeing memory overheads dominating, there isn't much time being spent in allocation/memcpy routines within decode_metadata, nor are we getting decode throughput at a level where I would expect memory throughput bottlenecks to be relevant <1Gpbs. My reading of that flamegraph is the inlined varint decoding is dominating the runtime of that benchmark |
On SIMD decoding varint: https://github.com/as-com/varint-simd Memory overhead does not show in flamegraph as it is implicit (caused by cpu memory stalls rather than function calls). It is mostly based on speculations... In #5854 (comment) I discussed a simple trick that improves the performance by up to 30%, by reducing the memory footprint and reduce implicit memory movement. On memory bandwidth: it takes 300ms to decode 100k columns, which allocates 600MB memory, that is ~2GB per second. |
I did not see varint decoding as a bottleneck in my benchmarks. I experimented with using BMI2 instructions, but that still requires at least one branch to check whether we can read 8 bytes at a time and fallback to sequential code if not, and for numbers larger than 8*7 bits one or two more branches. That does not make the code much smaller anymore, and assuming mostly small integers and good branch prediction there seems to be no big improvement. I think most of the unaccounted time in the flamegraph is related to moving of data on the stack, which rust still does not optimize that well. |
Fair enough, I guess we're already much faster than I realised 😅 I think this is definitely an interesting avenue to explore, however, I remain a little hesitant about the complexity vs real-world performance benefit. If we're already at the point where memory throughput is the bottleneck, what hope does any IO subsystem realistically have of keeping up, thrift compression isn't that good. Then again maybe I'm just being a curmudgeon about how practical a 100k column file with 10 row groups even is... It'd be 1TB if each column chunk is 1MB. |
To be clear, I don't have a need personally (and I don't think InfluxData has a technical need) at the moment to actually invest the engineering time to make the thrift decoding faster. Instead, my goal of filing these tickets is to leave sufficient information / analysis for anyone for whom it is important (e.g. machine learning use cases, potential users of a "Parquet V3" ,etc) that they could undertake the work if it was actually critical to their workload I tend to agree with @tustvold that there are likely only a few real world senarios where the system bottleneck is parquet decoding, though I am sure we can come up with them |
It turns out that someone on discord found out there is a TODO in the code: https://discord.com/channels/885562378132000778/885562378132000781/1252058490512478300 That seems to hint at the ideas in this ticket arrow-rs/parquet/src/file/footer.rs Line 92 in 0d1511c
|
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As part of #5853 we are considering ways to improve parquet metadata decoding speed in the context of files with wide schemas (e.g. 10,000 columns)
One common observation is that many queries only require a small subset of the columns, but because of how standard thrift decoders are implemented, they must decode the entire metadata even if only a subset of the columns is needed
Due to the Apache Thrift variable length encoding, the decoder likely still requires scanning the entire metadata, but there is no need to create rust structs for fields that will not be read.
Simply skipping such fields I think would likely result in substantial savings. Some evidence for this is @jhorstmann's prototype that avoids copying structs off the stack results in 2x performance improvements. See #5775 (comment)
Thus we could likely optimize the decoding of metadata for large schemas even more by selectively decoding only the fields needed. This idea also described at a high level here: https://medium.com/pinterest-engineering/improving-data-processing-efficiency-using-partial-deserialization-of-thrift-16bc3a4a38b4
Describe the solution you'd like
Implement some sort of projection pushdown when decoding metadata. Perhaps we could add a projection argument to this API https://docs.rs/parquet/latest/parquet/file/footer/fn.decode_metadata.html
Describe alternatives you've considered
Additional context
The text was updated successfully, but these errors were encountered: