-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use C++ to parse and filter parquet footers. #5310
Use C++ to parse and filter parquet footers. #5310
Conversation
Signed-off-by: Robert (Bobby) Evans <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
ParquetMetadataConverter.range(file.start, file.start + file.length)) | ||
val footer = footerReader match { | ||
case ParquetFooterReaderType.NATIVE => | ||
System.err.println("NATIVE FOOTER READER...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch to real logging once done with WIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @gerashegalov
ParquetMetadataConverter.range(file.start, file.start + file.length)) | ||
val footer = footerReader match { | ||
case ParquetFooterReaderType.NATIVE => | ||
System.err.println("NATIVE FOOTER READER...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @gerashegalov
|
||
@throws[IOException] | ||
override def read(buf: ByteBuffer): Int = | ||
if (buf.hasArray) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a small comment somewhere on why we need two implementations one for direct and one for JVM byte buffers.
"msg=constructor ParquetFileReader in class ParquetFileReader is deprecated" | ||
) | ||
private def addNamesAndCount(names: ArrayBuffer[String], children: ArrayBuffer[Int], | ||
name: String, num_children: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name: String, num_children: Int): Unit = { | |
name: String, numChildren: Int): Unit = { |
build |
build |
@jlowe @abellina @gerashegalov could you please take another look? I think I have addressed all of the review comments. |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All my comments are minor, mostly about potentially simplifying/optimizing the copies leveraging HostMemoryBuffer's ability to alias as a DirectByteBuffer. Nothing is must-fix on my end.
build |
Thanks for the review @jlowe I think I have addressed everything. |
@throws[IOException] | ||
override def readFully(buf: ByteBuffer): Unit = { | ||
val requested = buf.remaining() | ||
val avail = available() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was curious why this readFully
checks for available()
throwing before the read, whereas others are checking the amount read and throwing after the read. But this is not a blocker.
Converted to draft because I hit a bug with the latest code that I need to debug. |
This reverts commit 62de08b.
build |
I reverted the last set of changes to the StreamReader. They were causing issues when reading large files. I don't know why ByteBuffers are only used for large files but they are. Because the comments were nits I decided to revert this and I will file a follow on issue to see if we can make it work properly, along with some investigation into why tests were not taking that code path. |
I filed #5452 as the follow on issue. |
build |
This still needs tests and documentation, but I wanted to get a PR up sooner than later.
This depends on NVIDIA/spark-rapids-jni#199