diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java index 5888ca44556d..2a4fd57d8dfd 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java @@ -17,10 +17,11 @@ import alluxio.Constants; import alluxio.client.file.CacheContext; -import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.conf.Configuration; import alluxio.conf.InstancedConfiguration; import alluxio.conf.PropertyKey; +import alluxio.file.ReadTargetBuffer; +import alluxio.network.protocol.databuffer.DataFileChannel; import alluxio.util.io.BufferUtils; import org.junit.Before; @@ -32,6 +33,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Optional; +import java.util.function.Supplier; /** * Tests for the {@link LocalCacheManager} class. @@ -228,7 +231,7 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { } @Override - public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer, + public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer, CacheContext cacheContext) { if (!mCache.containsKey(pageId)) { return 0; @@ -241,6 +244,29 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuf return bytesToRead; } + @Override + public void commitFile(String fileId) { + throw new UnsupportedOperationException("commitFile method is unsupported. "); + } + + @Override + public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead, + ReadTargetBuffer buffer, CacheContext cacheContext, + Supplier externalDataSupplier) { + int bytesRead = get(pageId, pageOffset, + bytesToRead, buffer, cacheContext); + if (bytesRead > 0) { + return bytesRead; + } + byte[] page = externalDataSupplier.get(); + if (page.length == 0) { + return 0; + } + buffer.writeBytes(page, pageOffset, bytesToRead); + put(pageId, page, cacheContext); + return bytesToRead; + } + @Override public boolean delete(PageId pageId) { if (mCache.containsKey(pageId)) { @@ -261,6 +287,27 @@ public boolean append(PageId pageId, int appendAt, byte[] page, CacheContext cac } @Override - public void close() throws Exception {} + public void deleteFile(String fileId) { + // no-op + } + + @Override + public void deleteTempFile(String fileId) { + // no-op + } + + @Override + public Optional getUsage() { + return Optional.empty(); + } + + @Override + public Optional getDataFileChannel(PageId pageId, int pageOffset, + int bytesToRead, CacheContext cacheContext) { + return Optional.empty(); + } + + @Override + public void close() {} } } diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/HangingPageStore.java b/core/client/fs/src/test/java/alluxio/client/file/cache/HangingPageStore.java index fde5fafbc3c2..9917f03c82f2 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/HangingPageStore.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/HangingPageStore.java @@ -12,9 +12,9 @@ package alluxio.client.file.cache; import alluxio.client.file.cache.store.LocalPageStore; -import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.client.file.cache.store.PageStoreOptions; import alluxio.exception.PageNotFoundException; +import alluxio.file.ReadTargetBuffer; import java.io.IOException; import java.nio.ByteBuffer; @@ -44,7 +44,7 @@ public void delete(PageId pageId) throws IOException, PageNotFoundException { } @Override - public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, boolean isTemporary) throws IOException, PageNotFoundException { checkStopHanging(); diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/TimeBoundPageStoreTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/TimeBoundPageStoreTest.java index 76cbae4ab79e..422517afad50 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/TimeBoundPageStoreTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/TimeBoundPageStoreTest.java @@ -20,12 +20,12 @@ import static org.junit.Assert.fail; import alluxio.Constants; -import alluxio.client.file.cache.store.ByteArrayTargetBuffer; import alluxio.client.file.cache.store.PageStoreOptions; import alluxio.conf.Configuration; import alluxio.conf.InstancedConfiguration; import alluxio.conf.PropertyKey; import alluxio.exception.PageNotFoundException; +import alluxio.file.ByteArrayTargetBuffer; import alluxio.util.io.BufferUtils; import org.junit.Before; diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java index 3c47646f64b9..d07c492ca4a3 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java @@ -19,6 +19,7 @@ import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; +import alluxio.file.ByteArrayTargetBuffer; import org.junit.Before; import org.junit.Rule; diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java index 44cae0a0234e..ed6f4b2aacb5 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java @@ -16,6 +16,7 @@ import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; +import alluxio.file.ByteArrayTargetBuffer; import org.junit.Before; import org.junit.Test; diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java index e4b5cc115159..40cfe243ce38 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java @@ -20,6 +20,7 @@ import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; import alluxio.exception.PageNotFoundException; +import alluxio.file.ByteArrayTargetBuffer; import alluxio.util.io.BufferUtils; import org.junit.After; @@ -40,7 +41,6 @@ public class PageStoreTest { @Parameterized.Parameters public static Collection data() { return Arrays.asList(new Object[][] { - {PageStoreType.ROCKS}, {PageStoreType.LOCAL}, {PageStoreType.MEM} }); diff --git a/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java b/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java index 5b0c74fd6fe7..372bc3604033 100644 --- a/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java +++ b/core/client/hdfs/src/main/java/alluxio/hadoop/LocalCacheFileSystem.java @@ -16,12 +16,14 @@ import alluxio.AlluxioURI; import alluxio.client.file.CacheContext; +import alluxio.client.file.FileInStream; import alluxio.client.file.URIStatus; import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.LocalCacheFileInStream; import alluxio.client.file.cache.filter.CacheFilter; import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; +import alluxio.exception.AlluxioException; import alluxio.metrics.MetricsConfig; import alluxio.metrics.MetricsSystem; import alluxio.wire.FileInfo; @@ -41,6 +43,7 @@ import java.io.IOException; import java.net.URI; import java.util.Map; +import java.util.Optional; import java.util.Properties; /** @@ -93,8 +96,8 @@ public synchronized void initialize(URI uri, org.apache.hadoop.conf.Configuratio } MetricsSystem.startSinksFromConfig(new MetricsConfig(metricsProperties)); mCacheManager = CacheManager.Factory.get(mAlluxioConf); - LocalCacheFileInStream.registerMetrics(); mCacheFilter = CacheFilter.create(mAlluxioConf); + LocalCacheFileInStream.registerMetrics(); } @Override @@ -146,19 +149,45 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { } /** - * Attempts to open the specified file for reading. + * A wrapper method to default not enforce an open call. * * @param status the status of the file to open * @param bufferSize stream buffer size in bytes, currently unused * @return an {@link FSDataInputStream} at the indicated path of a file */ public FSDataInputStream open(URIStatus status, int bufferSize) throws IOException { + return open(status, bufferSize, false); + } + + /** + * Attempts to open the specified file for reading. + * + * @param status the status of the file to open + * @param bufferSize stream buffer size in bytes, currently unused + * @param enforceOpen flag to enforce calling open to external storage + * @return an {@link FSDataInputStream} at the indicated path of a file + */ + public FSDataInputStream open(URIStatus status, int bufferSize, boolean enforceOpen) + throws IOException { if (mCacheManager == null || !mCacheFilter.needsCache(status)) { return mExternalFileSystem.open(HadoopUtils.toPath(new AlluxioURI(status.getPath())), bufferSize); } + Optional externalFileInStream; + if (enforceOpen) { + try { + // making the open call right now, instead of later when called back + externalFileInStream = Optional.of(mAlluxioFileOpener.open(status)); + } catch (AlluxioException e) { + throw new IOException(e); + } + } else { + externalFileInStream = Optional.empty(); + } + return new FSDataInputStream(new HdfsFileInputStream( - new LocalCacheFileInStream(status, mAlluxioFileOpener, mCacheManager, mAlluxioConf), + new LocalCacheFileInStream(status, mAlluxioFileOpener, mCacheManager, mAlluxioConf, + externalFileInStream), statistics)); } diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockMetaStore.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockMetaStore.java index 1564190d76b3..0a392db39958 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockMetaStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockMetaStore.java @@ -11,6 +11,7 @@ package alluxio.worker.page; +import alluxio.client.file.cache.CacheUsage; import alluxio.client.file.cache.DefaultPageMetaStore; import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageInfo; @@ -21,6 +22,7 @@ import alluxio.client.quota.CacheScope; import alluxio.collections.IndexDefinition; import alluxio.collections.IndexedSet; +import alluxio.exception.FileDoesNotExistException; import alluxio.exception.PageNotFoundException; import alluxio.exception.runtime.BlockDoesNotExistRuntimeException; import alluxio.worker.block.BlockStoreEventListener; @@ -86,6 +88,11 @@ public PagedBlockStoreDir getFieldValue(PagedTempBlockMeta o) { } }; + @Override + public Optional getUsage() { + return Optional.empty(); + } + private class BlockPageAllocator implements Allocator { private final Allocator mDelegate; @@ -182,6 +189,11 @@ public PageInfo getPageInfo(PageId pageId) throws PageNotFoundException { return mDelegate.getPageInfo(pageId); } + @Override + public PageInfo removePage(PageId pageId, boolean isTemporary) throws PageNotFoundException { + return null; + } + @Override public ReadWriteLock getLock() { return mDelegate.getLock(); @@ -249,6 +261,11 @@ public void commitFile(String fileId, String newFileId) throws PageNotFoundExcep mDelegate.commitFile(fileId, newFileId); } + @Override + public PageStoreDir getStoreDirOfFile(String fileId) throws FileDoesNotExistException { + return null; + } + /** * @param blockId * @return the permanent block meta after committing @@ -317,6 +334,11 @@ public void reset() { mBlocks.clear(); } + @Override + public Set getAllPagesByFileId(String fileId) { + return null; + } + @Override @GuardedBy("getLock().readLock()") public PageInfo evict(CacheScope cacheScope, PageStoreDir pageStoreDir) { diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java index 41b7a95d4016..a65155a6a697 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java @@ -16,6 +16,7 @@ import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.exception.runtime.AlluxioRuntimeException; +import alluxio.file.ByteArrayTargetBuffer; import alluxio.grpc.ErrorType; import alluxio.metrics.MetricKey; import alluxio.metrics.MetricsSystem; @@ -105,7 +106,8 @@ private long read(ByteBuf byteBuf, long offset, long length) throws IOException Preconditions.checkArgument(byteBuf.writableBytes() >= length, "buffer overflow, trying to write %s bytes, only %s writable", length, byteBuf.writableBytes()); - PageReadTargetBuffer target = new NettyBufTargetBuffer(byteBuf); + ByteArrayTargetBuffer target = + new ByteArrayTargetBuffer(byteBuf.array(), byteBuf.arrayOffset()); long bytesRead = 0; while (bytesRead < length) { long pos = offset + bytesRead; diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStoreDir.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStoreDir.java index b5d232651200..e57281af32bb 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStoreDir.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStoreDir.java @@ -14,6 +14,7 @@ import static alluxio.worker.page.PagedBlockStoreMeta.DEFAULT_MEDIUM; import static alluxio.worker.page.PagedBlockStoreMeta.DEFAULT_TIER; +import alluxio.client.file.cache.CacheUsage; import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageInfo; import alluxio.client.file.cache.PageStore; @@ -171,6 +172,11 @@ public boolean reserve(long bytes) { return mDelegate.reserve(bytes); } + @Override + public void deleteTempPage(PageInfo bytes) { + + } + @Override public long deletePage(PageInfo pageInfo) { long blockId = BlockPageId.downcast(pageInfo.getPageId()).getBlockId(); @@ -277,4 +283,9 @@ public Set getBlockPages(long blockId) { return mBlockToPagesMap.get(blockId).stream().map(PageInfo::getPageId) .collect(Collectors.toSet()); } + + @Override + public Optional getUsage() { + return Optional.empty(); + } } diff --git a/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java b/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java index 2c21647ec766..b7aa85325f9b 100644 --- a/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java +++ b/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java @@ -13,12 +13,16 @@ import alluxio.client.file.CacheContext; import alluxio.client.file.cache.CacheManager; +import alluxio.client.file.cache.CacheUsage; import alluxio.client.file.cache.PageId; -import alluxio.client.file.cache.store.PageReadTargetBuffer; +import alluxio.file.ReadTargetBuffer; +import alluxio.network.protocol.databuffer.DataFileChannel; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; /** * Implementation of cache manager that stores cached data in byte arrays in memory. @@ -36,6 +40,11 @@ class ByteArrayCacheManager implements CacheManager { mPages = new HashMap<>(); } + @Override + public void commitFile(String fileId) { + throw new UnsupportedOperationException("commitFile method is unsupported. "); + } + @Override public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { byte[] data = new byte[page.remaining()]; @@ -46,7 +55,7 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { } @Override - public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, CacheContext cacheContext) { if (!mPages.containsKey(pageId)) { return 0; @@ -56,6 +65,24 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuf return bytesToRead; } + @Override + public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead, + ReadTargetBuffer buffer, CacheContext cacheContext, + Supplier externalDataSupplier) { + int bytesRead = get(pageId, pageOffset, + bytesToRead, buffer, cacheContext); + if (bytesRead > 0) { + return bytesRead; + } + byte[] page = externalDataSupplier.get(); + if (page.length == 0) { + return 0; + } + buffer.writeBytes(page, pageOffset, bytesToRead); + put(pageId, page, cacheContext); + return bytesToRead; + } + @Override public boolean delete(PageId pageId) { return mPages.remove(pageId) != null; @@ -72,7 +99,50 @@ public boolean append(PageId pageId, int appendAt, byte[] page, CacheContext cac } @Override - public void close() throws Exception { + public void deleteFile(String fileId) { + mPages.keySet().removeIf(pageId -> pageId.getFileId().equals(fileId)); + } + + @Override + public void deleteTempFile(String fileId) { + mPages.keySet().removeIf(pageId -> pageId.getFileId().equals(fileId)); + } + + @Override + public Optional getUsage() { + return Optional.of(new Usage()); + } + + @Override + public Optional getDataFileChannel(PageId pageId, int pageOffset, + int bytesToRead, CacheContext cacheContext) { + return Optional.empty(); + } + + class Usage implements CacheUsage { + @Override + public Optional partitionedBy(PartitionDescriptor partition) { + return Optional.empty(); + } + + @Override + public long used() { + return mPages.values().stream().mapToInt(page -> page.length).sum(); + } + + @Override + public long available() { + return Integer.MAX_VALUE; + } + + @Override + public long capacity() { + return Integer.MAX_VALUE; + } + } + + @Override + public void close() { // no-op } } diff --git a/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockWriterTest.java b/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockWriterTest.java deleted file mode 100644 index fbbc687a5105..000000000000 --- a/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockWriterTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 - * (the "License"). You may not use this work except in compliance with the License, which is - * available at www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied, as more fully set forth in the License. - * - * See the NOTICE file distributed with this work for information regarding copyright ownership. - */ - -package alluxio.worker.page; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import alluxio.AlluxioTestDirectory; -import alluxio.client.file.CacheContext; -import alluxio.client.file.cache.CacheManager; -import alluxio.client.file.cache.CacheManagerOptions; -import alluxio.client.file.cache.DefaultPageMetaStore; -import alluxio.client.file.cache.LocalCacheManager; -import alluxio.client.file.cache.PageId; -import alluxio.client.file.cache.PageMetaStore; -import alluxio.client.file.cache.PageStore; -import alluxio.client.file.cache.evictor.CacheEvictor; -import alluxio.client.file.cache.evictor.FIFOCacheEvictor; -import alluxio.client.file.cache.store.ByteArrayTargetBuffer; -import alluxio.client.file.cache.store.LocalPageStoreDir; -import alluxio.client.file.cache.store.PageStoreOptions; -import alluxio.conf.Configuration; -import alluxio.conf.InstancedConfiguration; -import alluxio.conf.PropertyKey; -import alluxio.util.CommonUtils; -import alluxio.util.WaitForOptions; -import alluxio.util.io.BufferUtils; - -import com.google.common.collect.ImmutableList; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -@RunWith(Parameterized.class) -public class PagedBlockWriterTest { - private static final long BLOCK_ID = 1L; - - @Parameterized.Parameters - public static Collection data() { - return Arrays.asList(new Object[][] { - /*file_length, chuck_size, page_size*/ - {2048, 1024, 128}, - {2049, 1024, 128}, - {2048, 1023, 128}, - {2048, 1024, 129}, - }); - } - - @Parameterized.Parameter - public int mFileLength; - - @Parameterized.Parameter(1) - public int mChunkSize; - - @Parameterized.Parameter(2) - public int mPageSize; - - private LocalCacheManager mCacheManager; - private InstancedConfiguration mConf = Configuration.copyGlobal(); - private PageMetaStore mPageMetaStore; - private CacheEvictor mEvictor; - private CacheManagerOptions mCachemanagerOptions; - private PageStoreOptions mPageStoreOptions; - private PageStore mPageStore; - private LocalPageStoreDir mPageStoreDir; - private PagedBlockWriter mWriter; - - @Rule - public ExpectedException mThrown = ExpectedException.none(); - - @Before - public void before() throws Exception { - mConf.set(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE, mPageSize); - mConf.set(PropertyKey.WORKER_PAGE_STORE_DIRS, ImmutableList.of( - AlluxioTestDirectory.createTemporaryDirectory("page_store").getAbsolutePath())); - mCachemanagerOptions = CacheManagerOptions.createForWorker(mConf); - mPageStoreOptions = mCachemanagerOptions.getPageStoreOptions().get(0); - mPageStore = PageStore.create(mPageStoreOptions); - mEvictor = new FIFOCacheEvictor(mCachemanagerOptions.getCacheEvictorOptions()); - mPageStoreDir = new LocalPageStoreDir(mPageStoreOptions, mPageStore, mEvictor); - mPageStoreDir.reset(); - mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(mPageStoreDir)); - mCacheManager = - LocalCacheManager.create(mCachemanagerOptions, mPageMetaStore); - CommonUtils.waitFor("restore completed", - () -> mCacheManager.state() == CacheManager.State.READ_WRITE, - WaitForOptions.defaults().setTimeoutMs(10000)); - mWriter = new PagedBlockWriter(mCacheManager, BLOCK_ID, mPageSize); - } - - @After - public void after() throws Exception { - mWriter.close(); - } - - @Test - public void appendByteBuf() throws Exception { - for (int offset = 0; offset < mFileLength; offset += mChunkSize) { - int bytesToWrite = Math.min(mChunkSize, mFileLength - offset); - ByteBuf buffer = Unpooled.wrappedBuffer( - BufferUtils.getIncreasingByteBuffer(bytesToWrite)); - assertEquals(bytesToWrite, mWriter.append(buffer)); - } - mWriter.close(); - mPageMetaStore.commitFile(BlockPageId.tempFileIdOf(BLOCK_ID), - BlockPageId.fileIdOf(BLOCK_ID, mFileLength)); - mPageStoreDir.commit(BlockPageId.tempFileIdOf(BLOCK_ID), - BlockPageId.fileIdOf(BLOCK_ID, mFileLength)); - verifyDataInCache(); - } - - @Test - public void append() throws Exception { - for (int offset = 0; offset < mFileLength; offset += mChunkSize) { - int bytesToWrite = Math.min(mChunkSize, mFileLength - offset); - ByteBuffer buffer = - BufferUtils.getIncreasingByteBuffer(bytesToWrite); - assertEquals(bytesToWrite, mWriter.append(buffer)); - } - mWriter.close(); - mPageMetaStore.commitFile(BlockPageId.tempFileIdOf(BLOCK_ID), - BlockPageId.fileIdOf(BLOCK_ID, mFileLength)); - mPageStoreDir.commit(BlockPageId.tempFileIdOf(BLOCK_ID), - BlockPageId.fileIdOf(BLOCK_ID, mFileLength)); - verifyDataInCache(); - } - - private void verifyDataInCache() { - List pageIds = - mCacheManager.getCachedPageIdsByFileId( - BlockPageId.fileIdOf(BLOCK_ID, mFileLength), mFileLength); - assertEquals((int) Math.ceil((double) mFileLength / mPageSize), pageIds.size()); - byte[] dataInCache = new byte[mFileLength]; - for (int i = 0; i < pageIds.size(); i++) { - PageId pageId = pageIds.get(i); - mCacheManager.get(pageId, 0, Math.min(mPageSize, mFileLength - i * mPageSize), - new ByteArrayTargetBuffer(dataInCache, i * mPageSize), - CacheContext.defaults().setTemporary(false)); - } - for (int offset = 0; offset < mFileLength; offset += mChunkSize) { - int chunkLength = Math.min(mChunkSize, mFileLength - offset); - byte[] chunk = new byte[chunkLength]; - System.arraycopy(dataInCache, offset, chunk, 0, chunkLength); - assertTrue( - BufferUtils.equalIncreasingByteArray(chunkLength, chunk)); - } - } -}