From af5c5060f63bc403cb09fef53ef984aa43d47b67 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Mon, 1 Apr 2024 11:32:44 +0800 Subject: [PATCH] [CELEBORN-1363] AbstractRemoteShuffleInputGateFactory supports celeborn.client.shuffle.compression.codec to configure compression codec ### What changes were proposed in this pull request? `AbstractRemoteShuffleInputGateFactory` supports `celeborn.client.shuffle.compression.codec` to configure compression codec. ### Why are the changes needed? `AbstractRemoteShuffleInputGateFactory` only supports LZ4 compression codec via hard code at present. `AbstractRemoteShuffleInputGateFactory` should support `celeborn.client.shuffle.compression.codec` to configure compression codec like ZSTD etc. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2435 from SteNicholas/CELEBORN-1363. Authored-by: SteNicholas Signed-off-by: mingji --- .../plugin/flink/AbstractRemoteShuffleInputGateFactory.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java index fcf8907a42d..e6e920b7de1 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java @@ -44,9 +44,6 @@ public abstract class AbstractRemoteShuffleInputGateFactory { /** Number of max concurrent reading channels. */ protected final int numConcurrentReading; - /** Codec used for compression / decompression. */ - protected static final String compressionCodec = "LZ4"; - /** Network buffer size. */ protected final int networkBufferSize; @@ -94,7 +91,7 @@ public IndexedInputGate create( SupplierWithException bufferPoolFactory = createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers); BufferDecompressor bufferDecompressor = - new BufferDecompressor(networkBufferSize, compressionCodec); + new BufferDecompressor(networkBufferSize, celebornConf.shuffleCompressionCodec().name()); return createInputGate(owningTaskName, gateIndex, igdd, bufferPoolFactory, bufferDecompressor); }