Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove initial splits from delta splits generation #21320

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -1086,17 +1086,6 @@ keep a backup of the original values if you change them.
results in Trino maximizing the parallelization of data access by default.
Attempting to set it higher results in Trino not being able to start.
- `Integer.MAX_VALUE`
* - `delta.max-initial-splits`
- For each query, the coordinator assigns file sections to read first at the
`initial-split-size` until the `max-initial-splits` is reached. Then it
starts issuing reads of the `max-split-size` size.
- `200`
* - `delta.max-initial-split-size`
- Sets the initial [](prop-type-data-size) for a single read section
assigned to a worker until `max-initial-splits` have been processed. You can
also use the corresponding catalog session property
`<catalog-name>.max_initial_split_size`.
- `32MB`
* - `delta.max-split-size`
- Sets the largest [](prop-type-data-size) for a single read section
assigned to a worker after `max-initial-splits` have been processed. You can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@

@DefunctConfig({
"delta.experimental.ignore-checkpoint-write-failures",
"delta.legacy-create-table-with-existing-location.enabled"})
"delta.legacy-create-table-with-existing-location.enabled",
"delta.max-initial-splits",
"delta.max-initial-split-size"
})
public class DeltaLakeConfig
{
public static final String EXTENDED_STATISTICS_ENABLED = "delta.extended-statistics.enabled";
Expand All @@ -56,8 +59,6 @@ public class DeltaLakeConfig
private int domainCompactionThreshold = 1000;
private int maxOutstandingSplits = 1_000;
private int maxSplitsPerSecond = Integer.MAX_VALUE;
private int maxInitialSplits = 200;
private DataSize maxInitialSplitSize;
private DataSize maxSplitSize = DataSize.of(64, MEGABYTE);
private double minimumAssignedSplitWeight = 0.05;
private int maxPartitionsPerWriter = 100;
Expand Down Expand Up @@ -176,34 +177,6 @@ public DeltaLakeConfig setMaxSplitsPerSecond(int maxSplitsPerSecond)
return this;
}

public int getMaxInitialSplits()
{
return maxInitialSplits;
}

@Config("delta.max-initial-splits")
public DeltaLakeConfig setMaxInitialSplits(int maxInitialSplits)
{
this.maxInitialSplits = maxInitialSplits;
return this;
}

@NotNull
public DataSize getMaxInitialSplitSize()
{
if (maxInitialSplitSize == null) {
return DataSize.ofBytes(maxSplitSize.toBytes() / 2).to(maxSplitSize.getUnit());
}
return maxInitialSplitSize;
}

@Config("delta.max-initial-split-size")
public DeltaLakeConfig setMaxInitialSplitSize(DataSize maxInitialSplitSize)
{
this.maxInitialSplitSize = maxInitialSplitSize;
return this;
}

@NotNull
public DataSize getMaxSplitSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public final class DeltaLakeSessionProperties
implements SessionPropertiesProvider
{
public static final String MAX_SPLIT_SIZE = "max_split_size";
public static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
Expand Down Expand Up @@ -90,11 +89,6 @@ public DeltaLakeSessionProperties(
"Max split size",
deltaLakeConfig.getMaxSplitSize(),
true),
dataSizeProperty(
MAX_INITIAL_SPLIT_SIZE,
"Max initial split size",
deltaLakeConfig.getMaxInitialSplitSize(),
true),
durationProperty(
VACUUM_MIN_RETENTION,
"Minimal retention period for vacuum procedure",
Expand Down Expand Up @@ -244,11 +238,6 @@ public static DataSize getMaxSplitSize(ConnectorSession session)
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
}

public static DataSize getMaxInitialSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_INITIAL_SPLIT_SIZE, DataSize.class);
}

public static Duration getVacuumMinRetention(ConnectorSession session)
{
return session.getProperty(VACUUM_MIN_RETENTION, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -64,7 +63,6 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
Expand All @@ -79,7 +77,6 @@ public class DeltaLakeSplitManager
private final TypeManager typeManager;
private final TransactionLogAccess transactionLogAccess;
private final ExecutorService executor;
private final int maxInitialSplits;
private final int maxSplitsPerSecond;
private final int maxOutstandingSplits;
private final double minimumAssignedSplitWeight;
Expand All @@ -100,7 +97,6 @@ public DeltaLakeSplitManager(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.executor = requireNonNull(executor, "executor is null");
this.maxInitialSplits = config.getMaxInitialSplits();
this.maxSplitsPerSecond = config.getMaxSplitsPerSecond();
this.maxOutstandingSplits = config.getMaxOutstandingSplits();
this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight();
Expand Down Expand Up @@ -174,7 +170,6 @@ private Stream<DeltaLakeSplit> getSplits(
tableHandle.getWriteType().isEmpty() &&
// When only partitioning columns projected, there is no point splitting the files
mayAnyDataColumnProjected(tableHandle);
AtomicInteger remainingInitialSplits = new AtomicInteger(maxInitialSplits);
Optional<Instant> filesModifiedAfter = tableHandle.getAnalyzeHandle().flatMap(AnalyzeHandle::getFilesModifiedAfter);
Optional<Long> maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes);

Expand Down Expand Up @@ -248,8 +243,7 @@ private Stream<DeltaLakeSplit> getSplits(
splitPath,
addAction.getCanonicalPartitionValues(),
statisticsPredicate,
splittable,
remainingInitialSplits)
splittable)
.stream();
});
}
Expand Down Expand Up @@ -317,8 +311,7 @@ private List<DeltaLakeSplit> splitsForFile(
String splitPath,
Map<String, Optional<String>> partitionKeys,
TupleDomain<DeltaLakeColumnHandle> statisticsPredicate,
boolean splittable,
AtomicInteger remainingInitialSplits)
boolean splittable)
{
long fileSize = addFileEntry.getSize();

Expand All @@ -341,13 +334,7 @@ private List<DeltaLakeSplit> splitsForFile(
ImmutableList.Builder<DeltaLakeSplit> splits = ImmutableList.builder();
long currentOffset = 0;
while (currentOffset < fileSize) {
long maxSplitSize;
if (remainingInitialSplits.get() > 0 && remainingInitialSplits.getAndDecrement() > 0) {
maxSplitSize = getMaxInitialSplitSize(session).toBytes();
}
else {
maxSplitSize = getMaxSplitSize(session).toBytes();
}
long maxSplitSize = getMaxSplitSize(session).toBytes();
long splitSize = Math.min(maxSplitSize, fileSize - currentOffset);

splits.add(new DeltaLakeSplit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public void testDefaults()
.setDomainCompactionThreshold(1000)
.setMaxSplitsPerSecond(Integer.MAX_VALUE)
.setMaxOutstandingSplits(1_000)
.setMaxInitialSplits(200)
.setMaxInitialSplitSize(DataSize.of(32, DataSize.Unit.MEGABYTE))
.setMaxSplitSize(DataSize.of(64, DataSize.Unit.MEGABYTE))
.setMinimumAssignedSplitWeight(0.05)
.setMaxPartitionsPerWriter(100)
Expand Down Expand Up @@ -85,8 +83,6 @@ public void testExplicitPropertyMappings()
.put("delta.domain-compaction-threshold", "500")
.put("delta.max-outstanding-splits", "200")
.put("delta.max-splits-per-second", "10")
.put("delta.max-initial-splits", "5")
.put("delta.max-initial-split-size", "1 GB")
.put("delta.max-split-size", "10 MB")
.put("delta.minimum-assigned-split-weight", "0.01")
.put("delta.max-partitions-per-writer", "200")
Expand Down Expand Up @@ -120,8 +116,6 @@ public void testExplicitPropertyMappings()
.setDomainCompactionThreshold(500)
.setMaxOutstandingSplits(200)
.setMaxSplitsPerSecond(10)
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.of(1, GIGABYTE))
.setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE))
.setMinimumAssignedSplitWeight(0.01)
.setMaxPartitionsPerWriter(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ public void testReadWholePartitionSplittableFile()

Session session = Session.builder(getSession())
.setCatalogSessionProperty(catalog, DeltaLakeSessionProperties.MAX_SPLIT_SIZE, "1kB")
.setCatalogSessionProperty(catalog, DeltaLakeSessionProperties.MAX_INITIAL_SPLIT_SIZE, "1kB")
.build();

// Read partition column only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,51 +102,22 @@ public class TestDeltaLakeSplitManager
private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true);

@Test
public void testInitialSplits()
throws ExecutionException, InterruptedException
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
}

@Test
public void testNonInitialSplits()
public void testSplitSizes()
throws ExecutionException, InterruptedException
{
long fileSize = 50_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000))
.setMaxSplitSize(DataSize.ofBytes(20_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight));
makeSplit(0, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(20_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(40_000, 10_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
}
Expand All @@ -159,8 +130,6 @@ public void testSplitsFromMultipleFiles()
long secondFileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(3)
.setMaxInitialSplitSize(DataSize.ofBytes(2_000))
.setMaxSplitSize(DataSize.ofBytes(10_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

Expand All @@ -169,10 +138,8 @@ public void testSplitsFromMultipleFiles()
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight));
makeSplit(0, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 10_000, secondFileSize, minimumAssignedSplitWeight));
assertThat(splits).isEqualTo(expected);
}

Expand Down