Skip to content

Commit

Permalink
[CELEBORN-1363] AbstractRemoteShuffleInputGateFactory supports celebo…
Browse files Browse the repository at this point in the history
…rn.client.shuffle.compression.codec to configure compression codec

### What changes were proposed in this pull request?

`AbstractRemoteShuffleInputGateFactory` supports `celeborn.client.shuffle.compression.codec` to configure compression codec.

### Why are the changes needed?

`AbstractRemoteShuffleInputGateFactory` only supports LZ4 compression codec via hard code at present. `AbstractRemoteShuffleInputGateFactory` should support `celeborn.client.shuffle.compression.codec` to configure compression codec like ZSTD etc.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2435 from SteNicholas/CELEBORN-1363.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
SteNicholas authored and FMX committed Apr 1, 2024
1 parent 82022a9 commit af5c506
Showing 1 changed file with 1 addition and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
/** Number of max concurrent reading channels. */
protected final int numConcurrentReading;

/** Codec used for compression / decompression. */
protected static final String compressionCodec = "LZ4";

/** Network buffer size. */
protected final int networkBufferSize;

Expand Down Expand Up @@ -94,7 +91,7 @@ public IndexedInputGate create(
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers);
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
new BufferDecompressor(networkBufferSize, celebornConf.shuffleCompressionCodec().name());

return createInputGate(owningTaskName, gateIndex, igdd, bufferPoolFactory, bufferDecompressor);
}
Expand Down

0 comments on commit af5c506

Please sign in to comment.