Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update local cache to catch 3.x #18574

Merged
merged 7 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
package alluxio.client.file.cache;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.ByteArrayTargetBuffer;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.file.ReadTargetBuffer;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.metrics.MultiDimensionalMetricsSystem;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.resource.LockResource;

import com.codahale.metrics.Counter;
Expand All @@ -27,16 +30,19 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;

/**
* Interface for managing cached pages.
*/
public interface CacheManager extends AutoCloseable {
public interface CacheManager extends AutoCloseable, CacheStatus {
Logger LOG = LoggerFactory.getLogger(CacheManager.class);

/**
* State of a cache.
Expand Down Expand Up @@ -119,6 +125,12 @@ public static CacheManager create(AlluxioConfiguration conf,
try {
boolean isShadowCacheEnabled =
conf.getBoolean(PropertyKey.USER_CLIENT_CACHE_SHADOW_ENABLED);
boolean isNettyDataTransmissionEnable = false;
// Note that Netty data transmission doesn't support async write
if (isNettyDataTransmissionEnable) {
options.setIsAsyncWriteEnabled(false);
}
MultiDimensionalMetricsSystem.setCacheStorageSupplier(pageMetaStore::bytes);
if (isShadowCacheEnabled) {
return new NoExceptionCacheManager(
new CacheManagerWithShadowCache(LocalCacheManager.create(options, pageMetaStore),
Expand All @@ -140,6 +152,8 @@ static void clear() {
CacheManager manager = CACHE_MANAGER.getAndSet(null);
if (manager != null) {
manager.close();
MultiDimensionalMetricsSystem.setCacheStorageSupplier(
MultiDimensionalMetricsSystem.NULL_SUPPLIER);
}
} catch (Exception e) {
LOG.warn("Failed to close CacheManager: {}", e.toString());
Expand Down Expand Up @@ -236,6 +250,20 @@ default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
return get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer, CacheContext.defaults());
}

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
*
* @param pageId page identifier
* @param pageOffset offset into the page
* @param buffer destination buffer to write
* @param cacheContext cache related context
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
default int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
CacheContext cacheContext) {
throw new UnsupportedOperationException("This method is unsupported. ");
}

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
*
Expand Down Expand Up @@ -263,9 +291,24 @@ default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, i
* @param cacheContext cache related context
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext);

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
* Loads the page otherwise.
*
* @param pageId page identifier
* @param pageOffset offset into the page
* @param bytesToRead number of bytes to read in this page
* @param buffer destination buffer to write
* @param cacheContext cache related context
* @param externalDataSupplier the external data supplier to read a page
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier);

/**
* Get page ids by the given file id.
* @param fileId file identifier
Expand All @@ -276,6 +319,20 @@ default List<PageId> getCachedPageIdsByFileId(String fileId, long fileLength) {
throw new UnsupportedOperationException();
}

/**
* Deletes all pages of the given file.
*
* @param fileId the file id of the target file
*/
void deleteFile(String fileId);

/**
* Deletes all temporary pages of the given file.
*
* @param fileId the file id of the target file
*/
void deleteTempFile(String fileId);

/**
* Deletes a page from the cache.
*
Expand All @@ -289,6 +346,14 @@ default List<PageId> getCachedPageIdsByFileId(String fileId, long fileLength) {
*/
State state();

/**
* @param pageId the page id
* @return true if the page is cached. This method is not thread-safe as no lock is acquired
*/
default boolean hasPageUnsafe(PageId pageId) {
throw new UnsupportedOperationException();
}

/**
*
* @param pageId
Expand All @@ -306,4 +371,25 @@ default List<PageId> getCachedPageIdsByFileId(String fileId, long fileLength) {
default void invalidate(Predicate<PageInfo> predicate) {
throw new UnsupportedOperationException();
}

@Override
Optional<CacheUsage> getUsage();

/**
* Commit the File.
* @param fileId the file ID
*/
void commitFile(String fileId);

/**
* Get a {@link DataFileChannel} which wraps a {@link io.netty.channel.FileRegion}.
* @param pageId the page id
* @param pageOffset the offset inside the page
* @param bytesToRead the bytes to read
* @param cacheContext the cache context
* @return an object of {@link DataFileChannel}
*/
Optional<DataFileChannel> getDataFileChannel(
PageId pageId, int pageOffset, int bytesToRead, CacheContext cacheContext)
throws PageNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
import static alluxio.client.file.CacheContext.StatsUnit.BYTE;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.quota.CacheScope;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ReadTargetBuffer;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.DataFileChannel;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

/**
Expand All @@ -46,15 +50,33 @@ public CacheManagerWithShadowCache(CacheManager cacheManager, AlluxioConfigurati
mShadowCacheManager = ShadowCacheManager.create(conf);
}

@Override
public void commitFile(String fileId) {
mCacheManager.commitFile(fileId);
}

@Override
public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
updateShadowCache(pageId, page.remaining(), cacheContext);
return mCacheManager.put(pageId, page, cacheContext);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
CacheContext cacheContext) {
getOrUpdateShadowCache(pageId, bytesToRead, cacheContext);
return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext);
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
getOrUpdateShadowCache(pageId, bytesToRead, cacheContext);
return mCacheManager.getAndLoad(pageId, pageOffset, bytesToRead,
buffer, cacheContext, externalDataSupplier);
}

private void getOrUpdateShadowCache(PageId pageId, int bytesToRead, CacheContext cacheContext) {
int nread = mShadowCacheManager.get(pageId, bytesToRead, getCacheScope(cacheContext));
if (nread > 0) {
Metrics.SHADOW_CACHE_PAGES_HIT.inc();
Expand All @@ -64,7 +86,6 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuf
}
Metrics.SHADOW_CACHE_PAGES_READ.inc();
Metrics.SHADOW_CACHE_BYTES_READ.inc(bytesToRead);
return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext);
}

/**
Expand Down Expand Up @@ -142,6 +163,27 @@ public void close() throws Exception {
mCacheManager.close();
}

@Override
public void deleteFile(String fileId) {
mCacheManager.deleteFile(fileId);
}

@Override
public void deleteTempFile(String fileId) {
mCacheManager.deleteTempFile(fileId);
}

@Override
public Optional<CacheUsage> getUsage() {
return mCacheManager.getUsage();
}

@Override
public Optional<DataFileChannel> getDataFileChannel(PageId pageId, int pageOffset,
int bytesToRead, CacheContext cacheContext) throws PageNotFoundException {
return mCacheManager.getDataFileChannel(pageId, pageOffset, bytesToRead, cacheContext);
}

/**
* Decrease each item's clock and clean stale items.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.cache;

import java.util.Optional;

/**
* Mixin interface for various cache status info.
*/
public interface CacheStatus {
/**
* Gets cache usage.
*
* @return cache usage, or none if reporting usage info is not supported
*/
Optional<CacheUsage> getUsage();
}
Loading
Loading