From dcc3da14fb55133cabe6bb60fb6f7096d318e9d1 Mon Sep 17 00:00:00 2001 From: Jonas Irgens Kylling Date: Wed, 14 Feb 2024 20:35:24 +0100 Subject: [PATCH] Test incomplete stream reads in Alluxio caching Adds tests coverage for the bug fixed in b2df2f3f8ef312527ebb1a0b3ce2f09e2aa91ea1 --- .../IncompleteStreamMemoryFileSystem.java | 132 ++++++++++++++++++ .../alluxio/TestAlluxioCacheFileSystem.java | 2 +- .../TestFuzzAlluxioCacheFileSystem.java | 4 +- 3 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/IncompleteStreamMemoryFileSystem.java diff --git a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/IncompleteStreamMemoryFileSystem.java b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/IncompleteStreamMemoryFileSystem.java new file mode 100644 index 000000000000..4d789b5be23f --- /dev/null +++ b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/IncompleteStreamMemoryFileSystem.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; +import io.trino.filesystem.memory.MemoryFileSystem; + +import java.io.IOException; +import java.time.Instant; +import java.util.Random; + +import static java.util.Objects.requireNonNull; + +/** + * Simulates a file system where TrinoInputStream.read(buff, off, len) can return < len bytes. + */ +class IncompleteStreamMemoryFileSystem + extends MemoryFileSystem +{ + @Override + public TrinoInputFile newInputFile(Location location) + { + return new IncompleteStreamInputFile(super.newInputFile(location)); + } + + private static class IncompleteStreamInputFile + implements TrinoInputFile + { + private final TrinoInputFile delegate; + + public IncompleteStreamInputFile(TrinoInputFile delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public TrinoInput newInput() + throws IOException + { + return delegate.newInput(); + } + + @Override + public TrinoInputStream newStream() + throws IOException + { + return new IncompleteTrinoInputStream(delegate.newStream()); + } + + @Override + public long length() + throws IOException + { + return delegate.length(); + } + + @Override + public Instant lastModified() + throws IOException + { + return delegate.lastModified(); + } + + @Override + public boolean exists() + throws IOException + { + return delegate.exists(); + } + + @Override + public Location location() + { + return delegate.location(); + } + } + + static class IncompleteTrinoInputStream + extends TrinoInputStream + { + private final TrinoInputStream delegate; + private final Random random; + + public IncompleteTrinoInputStream(TrinoInputStream delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.random = new Random(42); + } + + @Override + public int read() + throws IOException + { + return delegate.read(); + } + + @Override + public int read(byte[] buff, int off, int len) + throws IOException + { + return delegate.read(buff, off, random.nextInt(0, len + 1)); + } + + @Override + public long getPosition() + throws IOException + { + return delegate.getPosition(); + } + + @Override + public void seek(long position) + throws IOException + { + delegate.seek(position); + } + } +} diff --git a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java index 8704d4c5a356..b84929ebb6bb 100644 --- a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java +++ b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java @@ -53,7 +53,7 @@ void beforeAll() .setCachePageSize(DataSize.valueOf("32003B")) .disableTTL() .setMaxCacheSizes("100MB"); - memoryFileSystem = new MemoryFileSystem(); + memoryFileSystem = new IncompleteStreamMemoryFileSystem(); cache = new AlluxioFileSystemCache(noopTracer(), configuration, new AlluxioCacheStats()); fileSystem = new CacheFileSystem(memoryFileSystem, cache, new DefaultCacheKeyProvider()); } diff --git a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestFuzzAlluxioCacheFileSystem.java b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestFuzzAlluxioCacheFileSystem.java index 8ca05b3d4232..5a8b98c93555 100644 --- a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestFuzzAlluxioCacheFileSystem.java +++ b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestFuzzAlluxioCacheFileSystem.java @@ -183,10 +183,8 @@ public TrinoFileSystem create() .setCachePageSize(DataSize.ofBytes(PAGE_SIZE)) .disableTTL() .setMaxCacheSizes(CACHE_SIZE + "B"); - MemoryFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); AlluxioFileSystemCache alluxioCache = new AlluxioFileSystemCache(Tracing.noopTracer(), configuration, new AlluxioCacheStats()); - return new CacheFileSystem(fileSystemFactory.create(ConnectorIdentity.ofUser("hello")), - alluxioCache, new TestingCacheKeyProvider()); + return new CacheFileSystem(new IncompleteStreamMemoryFileSystem(), alluxioCache, new TestingCacheKeyProvider()); } @Override