-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update to nvcomp-2.x JNI APIs #3757
Conversation
Signed-off-by: Jim Brennan <[email protected]>
Converted to Draft because rapidsai/cudf#9384 is not in, and is even still in draft |
Thanks Bobby. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think main thing is if there's a way to do the lz4 config slightly differently. I know that's how I had prototyped it... (sorry ahead of time).
@@ -31,7 +32,8 @@ class NvcompLZ4CompressionCodec extends TableCompressionCodec with Arm { | |||
contigTable: ContiguousTable, | |||
stream: Cuda.Stream): CompressedTable = { | |||
val tableBuffer = contigTable.getBuffer | |||
val (compressedSize, oversizedBuffer) = NvcompLZ4CompressionCodec.compress(tableBuffer, stream) | |||
val (compressedSize, oversizedBuffer) = | |||
NvcompLZ4CompressionCodec.compress(tableBuffer, codecConfigs.lz4ChunkSize, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, since we have codecConfigs
everywhere, may as well pass it to compress
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the change to replace condecConfigs with rapidsConf as a constructor argument, and then I set a val in the class for the lz4ChunkSize, which is what I pass here now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using RapidsConf didn't work in general because it is not serializable, so I did change this to pass codecConfigs.
input: DeviceMemoryBuffer, | ||
lz4ChunkSize: Int, | ||
stream: Cuda.Stream): (Long, DeviceMemoryBuffer) = { | ||
val lz4Config = LZ4Compressor.configure(lz4ChunkSize, input.getLength()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be wrapped in withResource
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for the LZ4Compressor. The Configuration returned by this method is not closeable, and doesn't need to be. The configuration for LZ4Decompressor does need to be closed, because there is a metadata object that needs to be destroyed.
@@ -1659,6 +1665,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { | |||
|
|||
lazy val shuffleCompressionCodec: String = get(SHUFFLE_COMPRESSION_CODEC) | |||
|
|||
lazy val shuffleCompressionLz4ChunkSize: Int = get(SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazy val shuffleCompressionLz4ChunkSize: Int = get(SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE) | |
lazy val shuffleCompressionLz4ChunkSize: Int = get(SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Fixed.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala
Show resolved
Hide resolved
maxBatchMemorySize) | ||
val inputBuffers: Array[BaseDeviceMemoryBuffer] = tables.map { table => | ||
val buffer = table.getBuffer | ||
// cudf compressor will try to close this batch but this interface does not close inputs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
// cudf compressor will try to close this batch but this interface does not close inputs | |
// cudf compressor guarantees that close will be called for `inputBuffers` and will not throw before | |
// but this interface does not close inputs |
i.e. could be a leak if compress
throws before it wraps it, but that's not the case as written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wonder if compress
should not close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will follow up with Jason on this. This was part of the patch he gave me. My guess is that the compress code is trying to free up the input buffers before allocating the final output buffers, to reduce the memory pressure. But doing this incRef here defeats that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did update the comment for now.
} | ||
closeOnExcept(batchCompressor.compress(inputBuffers, stream)) { compressedBuffers => | ||
withResource(new NvtxRange("lz4 post process", NvtxColor.YELLOW)) { _ => | ||
require(compressedBuffers.length == tables.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require is nice, I would add a message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
extends BatchedBufferDecompressor(maxBatchMemory, stream) { | ||
override val codecId: Byte = CodecType.NVCOMP_LZ4 | ||
|
||
override def decompressAsync( | ||
inputBuffers: Array[BaseDeviceMemoryBuffer], | ||
bufferMetas: Array[BufferMeta], | ||
stream: Cuda.Stream): Array[DeviceMemoryBuffer] = { | ||
BatchedLZ4Decompressor.decompressAsync(inputBuffers, stream) | ||
require(inputBuffers.length == bufferMetas.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, so we have a human readable message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
closeOnExcept(batchCompressor.compress(inputBuffers, stream)) { compressedBuffers => | ||
withResource(new NvtxRange("lz4 post process", NvtxColor.YELLOW)) { _ => | ||
require(compressedBuffers.length == tables.length) | ||
compressedBuffers.zipWithIndex.map { case (buffer, i) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compressedBuffers.zip(tables) { case (buffer, table) =>
Since we don't need the index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - Improved as suggested.
bufferMetas.zip(inputBuffers).safeMap { case (meta, input) => | ||
// cudf decompressor will try to close inputs but this interface does not close inputs | ||
input.incRefCount() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should decompressor not close its inputs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to follow up with Jason on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to be consistent with the plugin compressor batch API which also does not close its inputs. We could consider changing this, but we'd need to update all callers accordingly.
val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.lz4.chunkSize") | ||
.doc("A configurable chunk size to use when compressing with LZ4.") | ||
.internal() | ||
.integerConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, this should be bytesConf(ByteUnit.BYTE)
(my fault)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I use bytesConf(), it forces the type to be Long, but the APIs for chunksize all take an integer, so I wanted to define this as an integer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it is just in the lz4compressConfigure JNI that we are using an int. I can change this to use longs for chunksize everywhere - but will need to make changes in cudf as well. Do you think it is worth it?
Thanks for the review @abellina! I put up another commit that addresses most of them. This does not include changing the chunkSize config to a Long and it does not change the compress/decompress closing inputs behavior. |
I pushed another commit to change the chunkSize to long after pushing a corresponding commit in the cudf PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM
I +1'ed the cuDF PR for this. Thanks @jbrennan333 |
bufferMetas.zip(inputBuffers).safeMap { case (meta, input) => | ||
// cudf decompressor will try to close inputs but this interface does not close inputs | ||
input.incRefCount() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to be consistent with the plugin compressor batch API which also does not close its inputs. We could consider changing this, but we'd need to update all callers accordingly.
build |
Closes #3754.
The nvcomp JNI API in CUDF is being updated to 2.x via rapidsai/cudf#9384
Once that change goes into CUDF, we will need to merge these changes to update the plugin to use the new nvcomp-2.x APIs. This PR also includes changes (from Alessandro) to add a config option to specify the lz4 chunk size.
Note that this PR will not build without the CUDF changes, so we can't build it yet.