Skip to content

Commit

Permalink
fix compile error
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Apr 11, 2024
1 parent c5e670c commit 74df564
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import alluxio.Constants;
import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.file.ReadTargetBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.util.io.BufferUtils;

import org.junit.Before;
Expand All @@ -32,6 +33,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Tests for the {@link LocalCacheManager} class.
Expand Down Expand Up @@ -228,7 +231,7 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext) {
if (!mCache.containsKey(pageId)) {
return 0;
Expand All @@ -241,6 +244,29 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuf
return bytesToRead;
}

@Override
public void commitFile(String fileId) {
throw new UnsupportedOperationException("commitFile method is unsupported. ");
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext,
Supplier<byte[]> externalDataSupplier) {
int bytesRead = get(pageId, pageOffset,
bytesToRead, buffer, cacheContext);
if (bytesRead > 0) {
return bytesRead;
}
byte[] page = externalDataSupplier.get();
if (page.length == 0) {
return 0;
}
buffer.writeBytes(page, pageOffset, bytesToRead);
put(pageId, page, cacheContext);
return bytesToRead;
}

@Override
public boolean delete(PageId pageId) {
if (mCache.containsKey(pageId)) {
Expand All @@ -261,6 +287,27 @@ public boolean append(PageId pageId, int appendAt, byte[] page, CacheContext cac
}

@Override
public void close() throws Exception {}
public void deleteFile(String fileId) {
// no-op
}

@Override
public void deleteTempFile(String fileId) {
// no-op
}

@Override
public Optional<CacheUsage> getUsage() {
return Optional.empty();
}

@Override
public Optional<DataFileChannel> getDataFileChannel(PageId pageId, int pageOffset,
int bytesToRead, CacheContext cacheContext) {
return Optional.empty();
}

@Override
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
package alluxio.client.file.cache;

import alluxio.client.file.cache.store.LocalPageStore;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ReadTargetBuffer;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -44,7 +44,7 @@ public void delete(PageId pageId) throws IOException, PageNotFoundException {
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary)
throws IOException, PageNotFoundException {
checkStopHanging();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import static org.junit.Assert.fail;

import alluxio.Constants;
import alluxio.client.file.cache.store.ByteArrayTargetBuffer;
import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.util.io.BufferUtils;

import org.junit.Before;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.file.ByteArrayTargetBuffer;

import org.junit.Before;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.file.ByteArrayTargetBuffer;

import org.junit.Before;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.util.io.BufferUtils;

import org.junit.After;
Expand All @@ -40,7 +41,6 @@ public class PageStoreTest {
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{PageStoreType.ROCKS},
{PageStoreType.LOCAL},
{PageStoreType.MEM}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

import alluxio.AlluxioURI;
import alluxio.client.file.CacheContext;
import alluxio.client.file.FileInStream;
import alluxio.client.file.URIStatus;
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.LocalCacheFileInStream;
import alluxio.client.file.cache.filter.CacheFilter;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.metrics.MetricsConfig;
import alluxio.metrics.MetricsSystem;
import alluxio.wire.FileInfo;
Expand All @@ -41,6 +43,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -93,8 +96,8 @@ public synchronized void initialize(URI uri, org.apache.hadoop.conf.Configuratio
}
MetricsSystem.startSinksFromConfig(new MetricsConfig(metricsProperties));
mCacheManager = CacheManager.Factory.get(mAlluxioConf);
LocalCacheFileInStream.registerMetrics();
mCacheFilter = CacheFilter.create(mAlluxioConf);
LocalCacheFileInStream.registerMetrics();
}

@Override
Expand Down Expand Up @@ -146,19 +149,45 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
}

/**
* Attempts to open the specified file for reading.
* A wrapper method to default not enforce an open call.
*
* @param status the status of the file to open
* @param bufferSize stream buffer size in bytes, currently unused
* @return an {@link FSDataInputStream} at the indicated path of a file
*/
public FSDataInputStream open(URIStatus status, int bufferSize) throws IOException {
return open(status, bufferSize, false);
}

/**
* Attempts to open the specified file for reading.
*
* @param status the status of the file to open
* @param bufferSize stream buffer size in bytes, currently unused
* @param enforceOpen flag to enforce calling open to external storage
* @return an {@link FSDataInputStream} at the indicated path of a file
*/
public FSDataInputStream open(URIStatus status, int bufferSize, boolean enforceOpen)
throws IOException {
if (mCacheManager == null || !mCacheFilter.needsCache(status)) {
return mExternalFileSystem.open(HadoopUtils.toPath(new AlluxioURI(status.getPath())),
bufferSize);
}
Optional<FileInStream> externalFileInStream;
if (enforceOpen) {
try {
// making the open call right now, instead of later when called back
externalFileInStream = Optional.of(mAlluxioFileOpener.open(status));
} catch (AlluxioException e) {
throw new IOException(e);
}
} else {
externalFileInStream = Optional.empty();
}

return new FSDataInputStream(new HdfsFileInputStream(
new LocalCacheFileInStream(status, mAlluxioFileOpener, mCacheManager, mAlluxioConf),
new LocalCacheFileInStream(status, mAlluxioFileOpener, mCacheManager, mAlluxioConf,
externalFileInStream),
statistics));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package alluxio.worker.page;

import alluxio.client.file.cache.CacheUsage;
import alluxio.client.file.cache.DefaultPageMetaStore;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageInfo;
Expand All @@ -21,6 +22,7 @@
import alluxio.client.quota.CacheScope;
import alluxio.collections.IndexDefinition;
import alluxio.collections.IndexedSet;
import alluxio.exception.FileDoesNotExistException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.runtime.BlockDoesNotExistRuntimeException;
import alluxio.worker.block.BlockStoreEventListener;
Expand Down Expand Up @@ -86,6 +88,11 @@ public PagedBlockStoreDir getFieldValue(PagedTempBlockMeta o) {
}
};

@Override
public Optional<CacheUsage> getUsage() {
return Optional.empty();
}

private class BlockPageAllocator implements Allocator {
private final Allocator mDelegate;

Expand Down Expand Up @@ -182,6 +189,11 @@ public PageInfo getPageInfo(PageId pageId) throws PageNotFoundException {
return mDelegate.getPageInfo(pageId);
}

@Override
public PageInfo removePage(PageId pageId, boolean isTemporary) throws PageNotFoundException {
return null;
}

@Override
public ReadWriteLock getLock() {
return mDelegate.getLock();
Expand Down Expand Up @@ -249,6 +261,11 @@ public void commitFile(String fileId, String newFileId) throws PageNotFoundExcep
mDelegate.commitFile(fileId, newFileId);
}

@Override
public PageStoreDir getStoreDirOfFile(String fileId) throws FileDoesNotExistException {
return null;
}

/**
* @param blockId
* @return the permanent block meta after committing
Expand Down Expand Up @@ -317,6 +334,11 @@ public void reset() {
mBlocks.clear();
}

@Override
public Set<PageInfo> getAllPagesByFileId(String fileId) {
return null;
}

@Override
@GuardedBy("getLock().readLock()")
public PageInfo evict(CacheScope cacheScope, PageStoreDir pageStoreDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.grpc.ErrorType;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
Expand Down Expand Up @@ -105,7 +106,8 @@ private long read(ByteBuf byteBuf, long offset, long length) throws IOException
Preconditions.checkArgument(byteBuf.writableBytes() >= length,
"buffer overflow, trying to write %s bytes, only %s writable",
length, byteBuf.writableBytes());
PageReadTargetBuffer target = new NettyBufTargetBuffer(byteBuf);
ByteArrayTargetBuffer target =
new ByteArrayTargetBuffer(byteBuf.array(), byteBuf.arrayOffset());
long bytesRead = 0;
while (bytesRead < length) {
long pos = offset + bytesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static alluxio.worker.page.PagedBlockStoreMeta.DEFAULT_MEDIUM;
import static alluxio.worker.page.PagedBlockStoreMeta.DEFAULT_TIER;

import alluxio.client.file.cache.CacheUsage;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageInfo;
import alluxio.client.file.cache.PageStore;
Expand Down Expand Up @@ -171,6 +172,11 @@ public boolean reserve(long bytes) {
return mDelegate.reserve(bytes);
}

@Override
public void deleteTempPage(PageInfo bytes) {

}

@Override
public long deletePage(PageInfo pageInfo) {
long blockId = BlockPageId.downcast(pageInfo.getPageId()).getBlockId();
Expand Down Expand Up @@ -277,4 +283,9 @@ public Set<PageId> getBlockPages(long blockId) {
return mBlockToPagesMap.get(blockId).stream().map(PageInfo::getPageId)
.collect(Collectors.toSet());
}

@Override
public Optional<CacheUsage> getUsage() {
return Optional.empty();
}
}
Loading

0 comments on commit 74df564

Please sign in to comment.