diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemLayout.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemLayout.java new file mode 100644 index 000000000000..861d738f3a98 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemLayout.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import io.trino.filesystem.Location; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +public interface FileSystemLayout +{ + Location location(Location rootLocation, FileSystemSpooledSegmentHandle segmentHandle); + + List searchPaths(Location rootLocation); + + Optional getExpiration(Location location); +} diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java index 3595bc9ee5be..03022d4083b7 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java @@ -34,7 +34,6 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; -import static io.trino.spooling.filesystem.FileSystemSpooledSegmentHandle.getExpirationFromLocation; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -43,25 +42,27 @@ public class FileSystemSegmentPruner private final Logger log = Logger.get(FileSystemSegmentPruner.class); private final TrinoFileSystem fileSystem; + private final FileSystemLayout layout; private final ScheduledExecutorService executor; private final boolean enabled; private final Duration interval; - private final Location location; + private final Location rootLocation; private final long batchSize; private boolean closed; private boolean filesAreOrdered = true; @Inject - public FileSystemSegmentPruner(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, @ForSegmentPruner ScheduledExecutorService executor) + public FileSystemSegmentPruner(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout layout, @ForSegmentPruner ScheduledExecutorService executor) { this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null") .create(ConnectorIdentity.ofUser("ignored")); + this.layout = requireNonNull(layout, "layout is null"); this.executor = requireNonNull(executor, "executor is null"); this.enabled = config.isPruningEnabled(); this.interval = config.getPruningInterval(); this.batchSize = config.getPruningBatchSize(); - this.location = Location.of(config.getLocation()); + this.rootLocation = Location.of(config.getLocation()); } @PostConstruct @@ -95,6 +96,17 @@ long pruneExpiredBefore(Instant expiredBefore) if (closed) { return 0; } + + long pruned = 0; + for (Location location : layout.searchPaths(rootLocation)) { + log.debug("Pruning location: %s", location); + pruned += doPrune(expiredBefore, location); + } + return pruned; + } + + private long doPrune(Instant expiredBefore, Location location) + { long pruned = 0; try { @@ -102,7 +114,7 @@ long pruneExpiredBefore(Instant expiredBefore) FileIterator iterator = orderDetectingIterator(fileSystem.listFiles(location)); while (iterator.hasNext()) { FileEntry file = iterator.next(); - Optional handle = getExpirationFromLocation(file.location()); + Optional handle = layout.getExpiration(file.location()); // Not a spooled segment if (handle.isEmpty()) { continue; @@ -141,9 +153,9 @@ private void pruneExpiredSegments(Instant expiredBefore, List expiredS try { int batchSize = expiredSegments.size(); - Instant oldest = getExpirationFromLocation(expiredSegments.getFirst()) + Instant oldest = layout.getExpiration(expiredSegments.getFirst()) .orElseThrow(() -> new IllegalStateException("No expiration time found for " + expiredSegments.getFirst())); - Instant newest = getExpirationFromLocation(expiredSegments.getLast()) + Instant newest = layout.getExpiration(expiredSegments.getLast()) .orElseThrow(() -> new IllegalStateException("No expiration time found for " + expiredSegments.getLast())); fileSystem.deleteFiles(expiredSegments); log.info("Pruned %d segments expired before %s [oldest: %s, newest: %s]", batchSize, expiredBefore, oldest, newest); diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java index d86bbad502f4..9bc1717d582c 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java @@ -14,7 +14,6 @@ package io.trino.spooling.filesystem; import io.azam.ulidj.ULID; -import io.trino.filesystem.Location; import io.trino.filesystem.encryption.EncryptionKey; import io.trino.spi.QueryId; import io.trino.spi.protocol.SpooledSegmentHandle; @@ -35,8 +34,6 @@ public record FileSystemSpooledSegmentHandle( Optional encryptionKey) implements SpooledSegmentHandle { - private static final String OBJECT_NAME_SEPARATOR = "-"; - public FileSystemSpooledSegmentHandle { requireNonNull(queryId, "queryId is null"); @@ -65,39 +62,18 @@ public Instant expirationTime() } /** - * Storage object name starts with the ULID which is ordered lexicographically - * by the time of the expiration, which makes it possible to find the expired + * Storage identifiers are ULIDs which are ordered lexicographically + * by the time of the expiration. This makes it possible to find the expired * segments by listing the storage objects. This is crucial for the storage * cleanup process to be able to efficiently delete the expired segments. * * @return String lexicographically sortable storage object name * @see ULID specification */ - public String storageObjectName() - { - return ULID.fromBinary(uuid) + OBJECT_NAME_SEPARATOR + queryId; - } - @Override public String identifier() { - return ULID.fromBinary(uuid) + OBJECT_NAME_SEPARATOR + queryId + "." + encoding; - } - - public static Optional getExpirationFromLocation(Location location) - { - String filename = location.fileName(); - int index = filename.indexOf(OBJECT_NAME_SEPARATOR); - if (index == -1) { - return Optional.empty(); - } - - String uuid = filename.substring(0, index); - if (!ULID.isValid(uuid)) { - return Optional.empty(); - } - - return Optional.of(Instant.ofEpochMilli(ULID.getTimestamp(uuid))); + return ULID.fromBinary(uuid); } private static byte[] entropy(Random random) @@ -115,6 +91,7 @@ public String toString() .add("encoding", encoding) .add("expires", Instant.ofEpochMilli(ULID.getTimestampBinary(uuid))) .add("identifier", identifier()) + .add("encoding", encoding) .add("encryptionKey", encryptionKey.map(_ -> "[redacted]").orElse("[none")) .toString(); } diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingConfig.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingConfig.java index 67568cbf48a1..74a9e523a4ca 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingConfig.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingConfig.java @@ -18,6 +18,7 @@ import io.airlift.units.Duration; import jakarta.validation.constraints.AssertTrue; +import static io.trino.spooling.filesystem.FileSystemSpoolingConfig.Layout.SIMPLE; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MINUTES; @@ -27,6 +28,7 @@ public class FileSystemSpoolingConfig private boolean s3Enabled; private boolean gcsEnabled; private String location; + private Layout layout = SIMPLE; private Duration ttl = new Duration(12, HOURS); private boolean encryptionEnabled = true; private boolean pruningEnabled = true; @@ -81,6 +83,19 @@ public FileSystemSpoolingConfig setLocation(String location) return this; } + public Layout getLayout() + { + return layout; + } + + @Config("fs.layout") + @ConfigDescription("Spooled segments filesystem layout") + public FileSystemSpoolingConfig setLayout(Layout layout) + { + this.layout = layout; + return this; + } + public Duration getTtl() { return ttl; @@ -157,4 +172,10 @@ public boolean locationEndsWithSlash() { return location.endsWith("/"); } + + public enum Layout + { + SIMPLE, + PARTITIONED + } } diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java index dd86fe5587bb..8ba03002b836 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java @@ -58,17 +58,19 @@ public class FileSystemSpoolingManager private final Location location; private final EncryptionHeadersTranslator encryptionHeadersTranslator; private final TrinoFileSystem fileSystem; + private final FileSystemLayout fileSystemLayout; private final Duration ttl; private final boolean encryptionEnabled; private final Random random = ThreadLocalRandom.current(); @Inject - public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory) + public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout fileSystemLayout) { requireNonNull(config, "config is null"); this.location = Location.of(config.getLocation()); this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null") .create(ConnectorIdentity.ofUser("ignored")); + this.fileSystemLayout = requireNonNull(fileSystemLayout, "fileSystemLayout is null"); this.encryptionHeadersTranslator = encryptionHeadersTranslator(location); this.ttl = config.getTtl(); this.encryptionEnabled = config.isEncryptionEnabled(); @@ -79,7 +81,7 @@ public OutputStream createOutputStream(SpooledSegmentHandle handle) throws IOException { FileSystemSpooledSegmentHandle fileHandle = (FileSystemSpooledSegmentHandle) handle; - Location storageLocation = location(fileHandle); + Location storageLocation = fileSystemLayout.location(location, fileHandle); Optional encryption = fileHandle.encryptionKey(); TrinoOutputFile outputFile; @@ -109,8 +111,9 @@ public InputStream openInputStream(SpooledSegmentHandle handle) { FileSystemSpooledSegmentHandle fileHandle = (FileSystemSpooledSegmentHandle) handle; checkExpiration(fileHandle); + Optional encryption = fileHandle.encryptionKey(); - Location storageLocation = location(fileHandle); + Location storageLocation = fileSystemLayout.location(location, fileHandle); TrinoInputFile inputFile; @@ -129,7 +132,7 @@ public InputStream openInputStream(SpooledSegmentHandle handle) public void acknowledge(SpooledSegmentHandle handle) throws IOException { - fileSystem.deleteFile(location((FileSystemSpooledSegmentHandle) handle)); + fileSystem.deleteFile(fileSystemLayout.location(location, (FileSystemSpooledSegmentHandle) handle)); } @Override @@ -137,7 +140,7 @@ public Optional directLocation(SpooledSegmentHandle handle) throws IOException { FileSystemSpooledSegmentHandle fileHandle = (FileSystemSpooledSegmentHandle) handle; - Location storageLocation = location(fileHandle); + Location storageLocation = fileSystemLayout.location(location, fileHandle); Duration ttl = remainingTtl(fileHandle.expirationTime()); Optional key = fileHandle.encryptionKey(); @@ -207,14 +210,8 @@ public SpooledSegmentHandle handle(SpooledLocation location) return new FileSystemSpooledSegmentHandle(encoding, queryId, uuid, Optional.of(encryptionHeadersTranslator.extractKey(location.headers()))); } - private Location location(FileSystemSpooledSegmentHandle handle) - throws IOException - { - checkExpiration(handle); - return location.appendPath(handle.storageObjectName()); - } - private Duration remainingTtl(Instant expiresAt) + { return new Duration(between(Instant.now(), expiresAt).toMillis(), MILLISECONDS); } diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingModule.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingModule.java index 3480a6c9867f..78a9e8ddc655 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingModule.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingModule.java @@ -39,7 +39,11 @@ import static com.google.inject.multibindings.MapBinder.newMapBinder; import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.base.ClosingBinder.closingBinder; +import static io.trino.spooling.filesystem.FileSystemSpoolingConfig.Layout.PARTITIONED; +import static io.trino.spooling.filesystem.FileSystemSpoolingConfig.Layout.SIMPLE; public class FileSystemSpoolingModule extends AbstractConfigurationAwareModule @@ -78,6 +82,19 @@ protected void setup(Binder binder) closingBinder(binder).registerExecutor(Key.get(ScheduledExecutorService.class, ForSegmentPruner.class)); } + + install(conditionalModule( + FileSystemSpoolingConfig.class, + fileSystemConfig -> fileSystemConfig.getLayout() == SIMPLE, + layoutBinder -> layoutBinder.bind(FileSystemLayout.class).to(SimpleFileSystemLayout.class))); + + install(conditionalModule( + FileSystemSpoolingConfig.class, + fileSystemConfig -> fileSystemConfig.getLayout() == PARTITIONED, + layoutBinder -> { + configBinder(layoutBinder).bindConfig(PartitionedLayoutConfig.class); + layoutBinder.bind(FileSystemLayout.class).to(PartitionedFileSystemLayout.class); + })); } @Provides diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/PartitionedFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/PartitionedFileSystemLayout.java new file mode 100644 index 000000000000..c2bcef59c015 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/PartitionedFileSystemLayout.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import io.trino.filesystem.Location; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class PartitionedFileSystemLayout + extends SimpleFileSystemLayout +{ + private final int partitions; + + public PartitionedFileSystemLayout(PartitionedLayoutConfig layoutConfig) + { + this.partitions = requireNonNull(layoutConfig, "layoutConfig is null").getPartitions(); + } + + @Override + public Location location(Location rootLocation, FileSystemSpooledSegmentHandle segmentHandle) + { + return super.location(rootLocation.appendPath(partition(segmentHandle)), segmentHandle); + } + + @Override + public List searchPaths(Location rootLocation) + { + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(partitions); + for (int i = 0; i < partitions; i++) { + builder.add(rootLocation.appendPath(partition(i))); + } + return builder.build(); + } + + private String partition(FileSystemSpooledSegmentHandle handle) + { + HashCode hashCode = Hashing.murmur3_32_fixed().hashBytes(handle.uuid()); + return partition(Hashing.consistentHash(hashCode, partitions)); + } + + private String partition(int partition) + { + return Strings.padEnd(Integer.toString(partition), 3, '0') + "-spooled"; + } +} diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/PartitionedLayoutConfig.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/PartitionedLayoutConfig.java new file mode 100644 index 000000000000..7161544136fd --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/PartitionedLayoutConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; + +public class PartitionedLayoutConfig +{ + private int partitions = 32; + + @Min(2) + @Max(1024) + public int getPartitions() + { + return partitions; + } + + @ConfigDescription("Number of file system partitions to use") + @Config("fs.layout.partitions") + public PartitionedLayoutConfig setPartitions(int partitions) + { + this.partitions = partitions; + return this; + } +} diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java new file mode 100644 index 000000000000..41affc111ac5 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import io.azam.ulidj.ULID; +import io.trino.filesystem.Location; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +public class SimpleFileSystemLayout + implements FileSystemLayout +{ + private static final String SEGMENT_SEPARATOR = ".segment."; + + @Override + public Location location(Location rootLocation, FileSystemSpooledSegmentHandle segmentHandle) + { + return rootLocation.appendPath(segmentHandle.identifier() + SEGMENT_SEPARATOR + segmentHandle.queryId() + "." + segmentHandle.encoding()); + } + + @Override + public List searchPaths(Location rootLocation) + { + return List.of(rootLocation); + } + + @Override + public Optional getExpiration(Location location) + { + String filename = location.fileName(); + int index = filename.indexOf(SEGMENT_SEPARATOR); + if (index == -1) { + return Optional.empty(); // Not a segment + } + + String uuid = filename.substring(0, index); + if (!ULID.isValid(uuid)) { + return Optional.empty(); + } + return Optional.of(Instant.ofEpochMilli(ULID.getTimestamp(uuid))); + } +} diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java similarity index 83% rename from plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java rename to plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java index 7b92b07d430b..292a6842048c 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java @@ -22,13 +22,13 @@ import io.trino.spi.QueryId; import io.trino.spi.protocol.SpoolingContext; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; import java.time.Instant; import java.util.List; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -36,26 +36,26 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.temporal.ChronoUnit.MILLIS; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; -@TestInstance(PER_CLASS) -class TestFileSystemSegmentPruner +abstract class AbstractFileSystemSegmentPrunerTest { - private static final String TEST_LOCATION = "memory://"; + private static final Location TEST_LOCATION = Location.of("memory://"); private static final FileSystemSpoolingConfig SPOOLING_CONFIG = new FileSystemSpoolingConfig() - .setLocation(TEST_LOCATION) + .setLocation(TEST_LOCATION.toString()) .setPruningBatchSize(1); + protected abstract FileSystemLayout layout(); + @Test public void shouldPruneExpiredSegments() { MemoryFileSystem fileSystem = new MemoryFileSystem(); try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, layout(), executorService); Instant now = Instant.now(); - QueryId queryId = QueryId.valueOf("prune_expired"); + QueryId queryId = randomQueryId("prune_expired"); writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); Location nonExpiredSegment = writeDataSegment(fileSystem, queryId, now.plusSeconds(1)); @@ -74,10 +74,10 @@ public void shouldPruneExpiredSegmentsOnceAndClear() { MemoryFileSystem fileSystem = new MemoryFileSystem(); try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, layout(), executorService); Instant now = Instant.now(); - QueryId queryId = QueryId.valueOf("prune_expired"); + QueryId queryId = randomQueryId("prune_expired"); writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); @@ -100,11 +100,11 @@ public void shouldNotPruneLiveSegments() { MemoryFileSystem fileSystem = new MemoryFileSystem(); try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, layout(), executorService); Instant now = Instant.now(); - QueryId queryId = QueryId.valueOf("prune_live"); + QueryId queryId = randomQueryId("prune_live"); writeDataSegment(fileSystem, queryId, now.plusSeconds(1)); writeDataSegment(fileSystem, queryId, now.plusSeconds(2)); @@ -121,12 +121,13 @@ public void shouldNotPruneLiveSegments() public void shouldNotPruneSegmentsIfNotStrictlyBeforeExpiration() { TrinoFileSystem memoryFileSystem = new MemoryFileSystem(); + try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> memoryFileSystem, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> memoryFileSystem, layout(), executorService); Instant now = Instant.now(); - QueryId queryId = QueryId.valueOf("prune_now"); + QueryId queryId = randomQueryId("prune_now"); Location firstSegment = writeDataSegment(memoryFileSystem, queryId, now); Location secondSegment = writeDataSegment(memoryFileSystem, queryId, now); @@ -144,7 +145,7 @@ private Location writeDataSegment(TrinoFileSystem fileSystem, QueryId queryId, I { SpoolingContext context = new SpoolingContext("encoding", queryId, 100, 1000); FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(ThreadLocalRandom.current(), context, ttl); - Location location = Location.of(TEST_LOCATION).appendPath(handle.storageObjectName()); + Location location = layout().location(TEST_LOCATION, handle); try (OutputStream stream = fileSystem.newOutputFile(location).create()) { stream.write("dummy".getBytes(UTF_8)); return location; @@ -159,10 +160,10 @@ private List listFiles(TrinoFileSystem fileSystem, QueryId queryId) ImmutableList.Builder files = ImmutableList.builder(); try { - FileIterator iterator = fileSystem.listFiles(Location.of(TEST_LOCATION)); + FileIterator iterator = fileSystem.listFiles(TEST_LOCATION); while (iterator.hasNext()) { FileEntry entry = iterator.next(); - if (entry.location().fileName().endsWith(queryId.toString())) { + if (entry.location().fileName().contains(queryId.toString())) { files.add(entry.location()); } } @@ -172,4 +173,9 @@ private List listFiles(TrinoFileSystem fileSystem, QueryId queryId) throw new UncheckedIOException(e); } } + + private static QueryId randomQueryId(String name) + { + return QueryId.valueOf(name + "_" + UUID.randomUUID().toString().replace("-", "")); + } } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java index 2398d2568e27..7cc37f32c597 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java @@ -87,7 +87,7 @@ public void testHandleRoundTrip() FileSystemSpooledSegmentHandle handle2 = (FileSystemSpooledSegmentHandle) getSpoolingManager().handle(location); assertThat(handle.queryId()).isEqualTo(handle2.queryId()); - assertThat(handle.storageObjectName()).isEqualTo(handle2.storageObjectName()); + assertThat(handle.identifier()).isEqualTo(handle2.identifier()); assertThat(handle.uuid()).isEqualTo(handle2.uuid()); assertThat(handle.expirationTime()).isEqualTo(handle2.expirationTime()); assertThat(handle2.encryptionKey()).isPresent().hasValue(key); diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java index 594724f0086f..f77d030ad1f9 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java @@ -34,12 +34,12 @@ class TestFileSystemSpooledSegmentHandle .truncatedTo(MILLIS); // ULID retains millisecond precision @Test - public void testStorageObjectNameStability() + public void testStorageIdentifierStability() { Instant expireAt = Instant.ofEpochMilli(90000); FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(new NotARandomAtAll(), context, expireAt); - assertThat(handle.storageObjectName()) - .isEqualTo("0000002QWG0G2081040G208104-query_id"); + assertThat(handle.identifier()) + .isEqualTo("0000002QWG0G2081040G208104"); } @Test @@ -49,16 +49,16 @@ public void testLexicalOrdering() FileSystemSpooledSegmentHandle handle2 = FileSystemSpooledSegmentHandle.random(random, context, now.plusMillis(3)); FileSystemSpooledSegmentHandle handle3 = FileSystemSpooledSegmentHandle.random(random, context, now.plusMillis(2)); - assertThat(handle2.storageObjectName()) - .isGreaterThan(handle1.storageObjectName()); + assertThat(handle2.identifier()) + .isGreaterThan(handle1.identifier()); - assertThat(handle3.storageObjectName()) - .isLessThan(handle2.storageObjectName()) - .isGreaterThan(handle1.storageObjectName()); + assertThat(handle3.identifier()) + .isLessThan(handle2.identifier()) + .isGreaterThan(handle1.identifier()); - assertThat(handle1.storageObjectName()) - .isLessThan(handle2.storageObjectName()) - .isLessThan(handle3.storageObjectName()); + assertThat(handle1.identifier()) + .isLessThan(handle2.identifier()) + .isLessThan(handle3.identifier()); } private static class NotARandomAtAll diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingConfig.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingConfig.java index 5bc36cbe9479..1176939a87c1 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingConfig.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingConfig.java @@ -23,6 +23,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.trino.spooling.filesystem.FileSystemSpoolingConfig.Layout.PARTITIONED; +import static io.trino.spooling.filesystem.FileSystemSpoolingConfig.Layout.SIMPLE; class TestFileSystemSpoolingConfig { @@ -34,6 +36,7 @@ public void testDefaults() .setGcsEnabled(false) .setS3Enabled(false) .setLocation(null) + .setLayout(SIMPLE) .setEncryptionEnabled(true) .setTtl(new Duration(12, TimeUnit.HOURS)) .setPruningEnabled(true) @@ -49,6 +52,7 @@ public void testExplicitPropertyMappings() .put("fs.gcs.enabled", "true") .put("fs.s3.enabled", "true") .put("fs.location", "test") + .put("fs.layout", "PARTITIONED") .put("fs.segment.encryption", "false") .put("fs.segment.ttl", "1h") .put("fs.segment.pruning.enabled", "false") @@ -61,6 +65,7 @@ public void testExplicitPropertyMappings() .setGcsEnabled(true) .setS3Enabled(true) .setLocation("test") + .setLayout(PARTITIONED) .setEncryptionEnabled(false) .setTtl(new Duration(1, TimeUnit.HOURS)) .setPruningEnabled(false) diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java index 8be8f4fbe889..9d148e3b9e1d 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java @@ -62,7 +62,7 @@ protected SpoolingManager getSpoolingManager() .setAwsAccessKey(LOCALSTACK.getAccessKey()) .setAwsSecretKey(LOCALSTACK.getSecretKey()) .setStreamingPartSize(DataSize.valueOf("5.5MB")); - return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats())); + return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()), new SimpleFileSystemLayout()); } protected S3Client createS3Client() diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java index 1960377ee006..a8c2f4f1c2a3 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java @@ -63,6 +63,6 @@ protected SpoolingManager getSpoolingManager() .setAwsAccessKey(Minio.MINIO_ACCESS_KEY) .setAwsSecretKey(Minio.MINIO_SECRET_KEY) .setStreamingPartSize(DataSize.valueOf("5.5MB")); - return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats())); + return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()), new SimpleFileSystemLayout()); } } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java new file mode 100644 index 000000000000..3edbc656877f --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import io.azam.ulidj.ULID; +import io.trino.filesystem.Location; +import io.trino.spi.QueryId; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +class TestPartitionedFileSystemLayout +{ + private static final byte[] STATIC_ENTROPY = "notsorandombytes".getBytes(UTF_8); + private static final FileSystemLayout LAYOUT = new PartitionedFileSystemLayout(new PartitionedLayoutConfig().setPartitions(7)); + private static final Location ROOT_LOCATION = Location.of("memory://root/"); + + @Test + public void testStorageLocation() + { + FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", QueryId.valueOf("query_id"), ULID.generateBinary(213700331, STATIC_ENTROPY), Optional.empty()); + + assertThat(handle.identifier()).isEqualTo("00006BSKQBDSQQ8WVFE9GPWS3F"); + + Location segmentLocation = LAYOUT.location(ROOT_LOCATION, handle); + + assertThat(segmentLocation).isEqualTo(ROOT_LOCATION + .appendPath("600-spooled") + .appendPath("00006BSKQBDSQQ8WVFE9GPWS3F.segment.query_id.json")); + + assertThat(segmentLocation.fileName()).isEqualTo("00006BSKQBDSQQ8WVFE9GPWS3F.segment.query_id.json"); + assertThat(LAYOUT.getExpiration(segmentLocation)).hasValue(Instant.ofEpochMilli(213700331)); + } + + @Test + public void testSearchPaths() + { + assertThat(LAYOUT.searchPaths(ROOT_LOCATION)).containsOnly( + ROOT_LOCATION.appendPath("000-spooled"), + ROOT_LOCATION.appendPath("100-spooled"), + ROOT_LOCATION.appendPath("200-spooled"), + ROOT_LOCATION.appendPath("300-spooled"), + ROOT_LOCATION.appendPath("400-spooled"), + ROOT_LOCATION.appendPath("500-spooled"), + ROOT_LOCATION.appendPath("600-spooled")); + } + + @Test + public void testExpirationForNonSegment() + { + Location fileLocation = ROOT_LOCATION.appendPath("not_a_segment.json"); + assertThat(LAYOUT.getExpiration(fileLocation)).isEmpty(); + } +} diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedLayoutConfig.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedLayoutConfig.java new file mode 100644 index 000000000000..231eba90fdfb --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedLayoutConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +class TestPartitionedLayoutConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(PartitionedLayoutConfig.class) + .setPartitions(32)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("fs.layout.partitions", "64") + .buildOrThrow(); + + PartitionedLayoutConfig expected = new PartitionedLayoutConfig() + .setPartitions(64); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedLayoutFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedLayoutFileSystemSegmentPruner.java new file mode 100644 index 000000000000..2d9ee87aff72 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedLayoutFileSystemSegmentPruner.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import org.junit.jupiter.api.TestInstance; + +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestPartitionedLayoutFileSystemSegmentPruner + extends AbstractFileSystemSegmentPrunerTest +{ + @Override + protected FileSystemLayout layout() + { + return new PartitionedFileSystemLayout(new PartitionedLayoutConfig() + .setPartitions(7)); + } +} diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java new file mode 100644 index 000000000000..61193e765fc3 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import io.azam.ulidj.ULID; +import io.trino.filesystem.Location; +import io.trino.spi.QueryId; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +class TestSimpleFileSystemLayout +{ + private static final byte[] STATIC_ENTROPY = "notsorandombytes".getBytes(UTF_8); + private static final FileSystemLayout LAYOUT = new SimpleFileSystemLayout(); + private static final Location ROOT_LOCATION = Location.of("memory://root/"); + + @Test + public void testStorageLocation() + { + FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", QueryId.valueOf("query_id"), ULID.generateBinary(21370000, STATIC_ENTROPY), Optional.empty()); + + assertThat(handle.identifier()).isEqualTo("00000MC54GDSQQ8WVFE9GPWS3F"); + + Location segmentLocation = LAYOUT.location(ROOT_LOCATION, handle); + + assertThat(segmentLocation).isEqualTo(ROOT_LOCATION.appendPath("00000MC54GDSQQ8WVFE9GPWS3F.segment.query_id.json")); + assertThat(segmentLocation.fileName()).isEqualTo("00000MC54GDSQQ8WVFE9GPWS3F.segment.query_id.json"); + assertThat(LAYOUT.getExpiration(segmentLocation)).hasValue(Instant.ofEpochMilli(21370000)); + } + + @Test + public void testSearchPaths() + { + assertThat(LAYOUT.searchPaths(ROOT_LOCATION)).containsOnly(ROOT_LOCATION); + } + + @Test + public void testExpirationForNonSegment() + { + Location fileLocation = ROOT_LOCATION.appendPath("not_a_segment.json"); + assertThat(LAYOUT.getExpiration(fileLocation)).isEmpty(); + } +} diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleLayoutFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleLayoutFileSystemSegmentPruner.java new file mode 100644 index 000000000000..1d72afe12722 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleLayoutFileSystemSegmentPruner.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import org.junit.jupiter.api.TestInstance; + +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestSimpleLayoutFileSystemSegmentPruner + extends AbstractFileSystemSegmentPrunerTest +{ + @Override + protected FileSystemLayout layout() + { + return new SimpleFileSystemLayout(); + } +}