Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Apr 22, 2024
1 parent 68bc2df commit 446ec17
Showing 1 changed file with 142 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.file.NettyBufTargetBuffer;
import alluxio.file.ReadTargetBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.BufferUtils;
Expand All @@ -46,6 +51,9 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultFileRegion;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
Expand Down Expand Up @@ -669,6 +677,10 @@ public void asyncRestoreReadOnly() throws Exception {
assertTrue(mCacheManager.delete(PAGE_ID2));
}

/**
* Invalid page file will be deleted and cache manager will start normally.
* @throws Exception
*/
@Test
public void syncRestoreUnknownFile() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_RESTORE_ENABLED, false);
Expand All @@ -685,10 +697,14 @@ public void syncRestoreUnknownFile() throws Exception {
mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
mCacheManager = LocalCacheManager.create(mCacheManagerOptions, mPageMetaStore);
assertEquals(CacheManager.State.READ_WRITE, mCacheManager.state());
assertEquals(0, mCacheManager.get(PAGE_ID1, PAGE1.length, mBuf, 0));
assertEquals(0, mCacheManager.get(pageUuid, PAGE2.length, mBuf, 0));
assertEquals(PAGE1.length, mCacheManager.get(PAGE_ID1, PAGE1.length, mBuf, 0));
assertEquals(PAGE2.length, mCacheManager.get(pageUuid, PAGE2.length, mBuf, 0));
}

/**
* Invalid page file will be deleted and cache manager will start normally.
* @throws Exception
*/
@Test
public void asyncRestoreUnknownFile() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_RESTORE_ENABLED, true);
Expand All @@ -703,8 +719,8 @@ public void asyncRestoreUnknownFile() throws Exception {
FileUtils.createFile(Paths.get(rootDir, "invalidPageFile").toString());
mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
mCacheManager = createLocalCacheManager(mConf, mPageMetaStore);
assertEquals(0, mCacheManager.get(PAGE_ID1, PAGE1.length, mBuf, 0));
assertEquals(0, mCacheManager.get(pageUuid, PAGE2.length, mBuf, 0));
assertEquals(PAGE1.length, mCacheManager.get(PAGE_ID1, PAGE1.length, mBuf, 0));
assertEquals(PAGE2.length, mCacheManager.get(pageUuid, PAGE2.length, mBuf, 0));
}

@Test
Expand Down Expand Up @@ -806,6 +822,29 @@ public void asyncRestoreWithMorePagesThanCapacity() throws Exception {
assertEquals(0, mCacheManager.get(pageUuid, PAGE2.length, mBuf, 0));
}

@Test
public void ttlDeleteOldPagesWhenRestore() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_TTL_THRESHOLD_SECONDS, 5);
mConf.set(PropertyKey.USER_CLIENT_CACHE_TTL_ENABLED, true);
mCacheManagerOptions = CacheManagerOptions.create(mConf);
mCacheManager.close();
PageStoreDir dir = PageStoreDir.createPageStoreDirs(mCacheManagerOptions)
.get(0); // previous page store has been closed
PageId pageUuid = new PageId(UUID.randomUUID().toString(), 0);
dir.getPageStore().put(PAGE_ID1, PAGE1);
dir.getPageStore().put(PAGE_ID2, PAGE2);

dir = PageStoreDir.createPageStoreDirs(mCacheManagerOptions).get(0);
mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
Thread.sleep(6000);
dir.getPageStore().put(pageUuid,
BufferUtils.getIncreasingByteArray(PAGE1.length + PAGE2.length + 1));
mCacheManager = createLocalCacheManager(mConf, mPageMetaStore);
assertFalse(mCacheManager.hasPageUnsafe(PAGE_ID1));
assertFalse(mCacheManager.hasPageUnsafe(PAGE_ID2));
assertTrue(mCacheManager.hasPageUnsafe(pageUuid)); // we should have the new page
}

@Test
public void asyncCache() throws Exception {
// this must be smaller than the number of locks in the page store for the test to succeed
Expand Down Expand Up @@ -965,6 +1004,58 @@ public void getTimeout() throws Exception {
pageStore.setGetHanging(false);
}

@Test
public void getFaultyReadWithNoExceptionManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
NoExceptionCacheManager cacheManager =
new NoExceptionCacheManager(createLocalCacheManager(mConf, mPageMetaStore));
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetFaulty(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void getFaultyReadWithLocalCacheManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore);
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetFaulty(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void getCorruptedReadWithLocalCacheManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore);
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetCorrupted(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void deleteTimeout() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_TIMEOUT_DURATION, "2s");
Expand Down Expand Up @@ -1090,6 +1181,28 @@ public void listPageIds() throws Exception {
0).get(0));
}

@Test
public void getDataFileChannel() throws Exception {
mCacheManager = createLocalCacheManager();
mCacheManager.put(PAGE_ID1, PAGE1);
CacheContext cacheContext = CacheContext.defaults();
Optional<DataFileChannel> dataFileChannel = mCacheManager.getDataFileChannel(PAGE_ID1,
0, PAGE1.length, cacheContext);
assertNotNull(dataFileChannel);
assertEquals(dataFileChannel.isPresent(), true);
assertEquals(dataFileChannel.get().getNettyOutput() instanceof DefaultFileRegion, true);
DefaultFileRegion defaultFileRegion =
(DefaultFileRegion) dataFileChannel.get().getNettyOutput();
ByteBuf buf = Unpooled.buffer(PAGE1.length);
NettyBufTargetBuffer targetBuffer = new NettyBufTargetBuffer(buf);
long bytesTransferred = defaultFileRegion.transferTo(targetBuffer.byteChannel(), 0);
assertEquals(bytesTransferred, PAGE1.length);

byte[] bytes = new byte[PAGE1.length];
buf.readBytes(bytes);
assertArrayEquals(PAGE1, bytes);
}

/**
* A PageStore where put can throw IOException on put or delete.
*/
Expand All @@ -1100,6 +1213,23 @@ public FaultyPageStore() {

private AtomicBoolean mPutFaulty = new AtomicBoolean(false);
private AtomicBoolean mDeleteFaulty = new AtomicBoolean(false);
private AtomicBoolean mGetFaulty = new AtomicBoolean(false);

private AtomicBoolean mGetCorrupted = new AtomicBoolean(false);

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
if (mGetFaulty.get()) {
target.offset(target.offset() + 100);
throw new IOException("Page read fault");
}
if (mGetCorrupted.get()) {
target.offset(target.offset() + 100);
throw new PageCorruptedException("page corrupted");
}
return super.get(pageId, pageOffset, bytesToRead, target, isTemporary);
}

@Override
public void put(PageId pageId, ByteBuffer page, boolean isTemporary) throws IOException {
Expand All @@ -1124,6 +1254,14 @@ void setPutFaulty(boolean faulty) {
void setDeleteFaulty(boolean faulty) {
mDeleteFaulty.set(faulty);
}

void setGetFaulty(boolean faulty) {
mGetFaulty.set(faulty);
}

void setGetCorrupted(boolean faulty) {
mGetCorrupted.set(faulty);
}
}

/**
Expand Down

0 comments on commit 446ec17

Please sign in to comment.