Skip to content

Commit

Permalink
Add compression codec configuration for data in spills
Browse files Browse the repository at this point in the history
  • Loading branch information
hackeryang authored and raunaqmorarka committed Jan 19, 2024
1 parent efc56ba commit 1bb096b
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.log.Logger;
import io.trino.FeaturesConfig;
import io.trino.cache.NonKeyEvictableLoadingCache;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.SpillContext;
Expand All @@ -45,8 +46,6 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.FeaturesConfig.SPILLER_SPILL_PATH;
import static io.trino.cache.SafeCaches.buildNonEvictableCacheWithWeakInvalidateAll;
import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.spi.StandardErrorCode.OUT_OF_SPILL_SPACE;
import static io.trino.util.Ciphers.createRandomAesEncryptionKey;
import static java.lang.String.format;
Expand Down Expand Up @@ -94,7 +93,7 @@ public FileSingleStreamSpillerFactory(BlockEncodingSerde blockEncodingSerde, Spi
spillerStats,
featuresConfig.getSpillerSpillPaths(),
featuresConfig.getSpillMaxUsedSpaceThreshold(),
nodeSpillConfig.isSpillCompressionEnabled(),
nodeSpillConfig.getSpillCompressionCodec(),
nodeSpillConfig.isSpillEncryptionEnabled());
}

Expand All @@ -105,10 +104,10 @@ public FileSingleStreamSpillerFactory(
SpillerStats spillerStats,
List<Path> spillPaths,
double maxUsedSpaceThreshold,
boolean spillCompressionEnabled,
CompressionCodec compressionCodec,
boolean spillEncryptionEnabled)
{
this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, spillCompressionEnabled ? LZ4 : NONE);
this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, compressionCodec);
this.executor = requireNonNull(executor, "executor is null");
this.spillerStats = requireNonNull(spillerStats, "spillerStats cannot be null");
requireNonNull(spillPaths, "spillPaths is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@
package io.trino.spiller;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.trino.execution.buffer.CompressionCodec;
import jakarta.validation.constraints.NotNull;

import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;

@DefunctConfig("experimental.spill-compression-enabled")
public class NodeSpillConfig
{
private DataSize maxSpillPerNode = DataSize.of(100, DataSize.Unit.GIGABYTE);
private DataSize queryMaxSpillPerNode = DataSize.of(100, DataSize.Unit.GIGABYTE);

private boolean spillCompressionEnabled;
private CompressionCodec spillCompressionCodec = NONE;
private boolean spillEncryptionEnabled;

@NotNull
Expand Down Expand Up @@ -54,16 +61,24 @@ public NodeSpillConfig setQueryMaxSpillPerNode(DataSize queryMaxSpillPerNode)
return this;
}

public boolean isSpillCompressionEnabled()
@Deprecated
@LegacyConfig(value = "spill-compression-enabled", replacedBy = "spill-compression-codec")
public NodeSpillConfig setSpillCompressionEnabled(boolean spillCompressionEnabled)
{
return spillCompressionEnabled;
this.spillCompressionCodec = spillCompressionEnabled ? LZ4 : NONE;
return this;
}

@Config("spill-compression-enabled")
@LegacyConfig("experimental.spill-compression-enabled")
public NodeSpillConfig setSpillCompressionEnabled(boolean spillCompressionEnabled)
public CompressionCodec getSpillCompressionCodec()
{
return spillCompressionCodec;
}

@Config("spill-compression-codec")
@ConfigDescription("Compression codec used for data in spills")
public NodeSpillConfig setSpillCompressionCodec(CompressionCodec spillCompressionCodec)
{
this.spillCompressionEnabled = spillCompressionEnabled;
this.spillCompressionCodec = spillCompressionCodec;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.BlockEncodingSerde;
Expand Down Expand Up @@ -95,8 +96,8 @@ public static class BenchmarkData
@Param("10")
private int pagesCount = 10;

@Param("false")
private boolean compressionEnabled;
@Param("NONE")
private CompressionCodec compressionCodec;

@Param("true")
private boolean encryptionEnabled;
Expand All @@ -117,7 +118,7 @@ public void setup()
spillerStats,
ImmutableList.of(SPILL_PATH),
1.0,
compressionEnabled,
compressionCodec,
encryptionEnabled);
spillerFactory = new GenericSpillerFactory(singleStreamSpillerFactory);
pages = createInputPages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.operator.PageAssertions.assertPageEquals;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -85,7 +83,7 @@ public void setUp()
BlockEncodingSerde blockEncodingSerde = new TestingBlockEncodingSerde();
singleStreamSpillerFactory = new FileSingleStreamSpillerFactory(blockEncodingSerde, spillerStats, featuresConfig, nodeSpillConfig);
factory = new GenericSpillerFactory(singleStreamSpillerFactory);
PagesSerdeFactory pagesSerdeFactory = new PagesSerdeFactory(blockEncodingSerde, nodeSpillConfig.isSpillCompressionEnabled() ? LZ4 : NONE);
PagesSerdeFactory pagesSerdeFactory = new PagesSerdeFactory(blockEncodingSerde, nodeSpillConfig.getSpillCompressionCodec());
serializer = pagesSerdeFactory.createSerializer(Optional.empty());
memoryContext = newSimpleAggregatedMemoryContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.execution.buffer.PagesSerdeUtil;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.PageAssertions;
Expand All @@ -42,6 +43,8 @@
import static com.google.common.io.MoreFiles.listFiles;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.execution.buffer.PagesSerdeUtil.isSerializedPageCompressed;
import static io.trino.execution.buffer.PagesSerdeUtil.isSerializedPageEncrypted;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
Expand Down Expand Up @@ -83,31 +86,31 @@ public void tearDown()
public void testSpill()
throws Exception
{
assertSpill(false, false);
assertSpill(NONE, false);
}

@Test
public void testSpillCompression()
throws Exception
{
assertSpill(true, false);
assertSpill(LZ4, false);
}

@Test
public void testSpillEncryption()
throws Exception
{
assertSpill(false, true);
assertSpill(NONE, true);
}

@Test
public void testSpillEncryptionWithCompression()
throws Exception
{
assertSpill(true, true);
assertSpill(LZ4, true);
}

private void assertSpill(boolean compression, boolean encryption)
private void assertSpill(CompressionCodec compressionCodec, boolean encryption)
throws Exception
{
FileSingleStreamSpillerFactory spillerFactory = new FileSingleStreamSpillerFactory(
Expand All @@ -116,7 +119,7 @@ private void assertSpill(boolean compression, boolean encryption)
new SpillerStats(),
ImmutableList.of(spillPath.toPath()),
1.0,
compression,
compressionCodec,
encryption);
LocalMemoryContext memoryContext = newSimpleAggregatedMemoryContext().newLocalMemoryContext("test");
SingleStreamSpiller singleStreamSpiller = spillerFactory.create(TYPES, bytes -> {}, memoryContext);
Expand All @@ -138,7 +141,7 @@ private void assertSpill(boolean compression, boolean encryption)
.describedAs("at least one page should be successfully read back")
.isTrue();
Slice serializedPage = serializedPages.next();
assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compression);
assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compressionCodec == LZ4);
assertThat(isSerializedPageEncrypted(serializedPage)).isEqualTo(encryption);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static com.google.common.io.MoreFiles.listFiles;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.common.util.concurrent.Futures.getUnchecked;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_PREFIX;
Expand Down Expand Up @@ -279,7 +280,7 @@ private FileSingleStreamSpillerFactory spillerFactoryFactory(List<Path> paths, D
new SpillerStats(),
paths,
maxUsedSpaceThreshold,
false,
NONE,
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.execution.buffer.CompressionCodec.ZSTD;

public class TestNodeSpillConfig
{
Expand All @@ -33,7 +35,7 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(NodeSpillConfig.class)
.setMaxSpillPerNode(DataSize.of(100, GIGABYTE))
.setQueryMaxSpillPerNode(DataSize.of(100, GIGABYTE))
.setSpillCompressionEnabled(false)
.setSpillCompressionCodec(NONE)
.setSpillEncryptionEnabled(false));
}

Expand All @@ -43,14 +45,14 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("max-spill-per-node", "10MB")
.put("query-max-spill-per-node", "15 MB")
.put("spill-compression-enabled", "true")
.put("spill-compression-codec", "ZSTD")
.put("spill-encryption-enabled", "true")
.buildOrThrow();

NodeSpillConfig expected = new NodeSpillConfig()
.setMaxSpillPerNode(DataSize.of(10, MEGABYTE))
.setQueryMaxSpillPerNode(DataSize.of(15, MEGABYTE))
.setSpillCompressionEnabled(true)
.setSpillCompressionCodec(ZSTD)
.setSpillEncryptionEnabled(true);

assertFullMapping(properties, expected);
Expand Down
10 changes: 9 additions & 1 deletion docs/src/main/sphinx/admin/properties-spilling.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ Limit for memory used for unspilling a single aggregation operator instance.
- **Type:** {ref}`prop-type-boolean`
- **Default value:** `false`

Enables data compression for pages spilled to disk.
Enables data compression for pages spilled to disk. It is replaced by `spill-compression-codec`.

## `spill-compression-codec`

- **Type:** {ref}`prop-type-string`
- **Allowed values:** `NONE`, `LZ4`, `ZSTD`
- **Default value:** `NONE`

The compression codec to use when spilling pages to disk.

## `spill-encryption-enabled`

Expand Down

0 comments on commit 1bb096b

Please sign in to comment.