Skip to content

Commit

Permalink
add fsview cache
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Dec 13, 2024
1 parent b2121cb commit d74fea1
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hudi.source.HudiPartitionMgr;
import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class ExternalMetaCacheMgr {
// catalog id -> table schema cache
private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
// hudi partition manager
private final HudiPartitionMgr hudiPartitionMgr;
private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
Expand Down Expand Up @@ -122,7 +123,7 @@ public ExternalMetaCacheMgr() {
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);

hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
Expand Down Expand Up @@ -164,7 +165,15 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
}

public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
return hudiPartitionMgr.getPartitionProcessor(catalog);
return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
}

public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
}

public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
return hudiMetadataCacheMgr;
}

public IcebergMetadataCache getIcebergMetadataCache() {
Expand Down Expand Up @@ -194,7 +203,7 @@ public void removeCache(long catalogId) {
if (schemaCacheMap.remove(catalogId) != null) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
hudiMetadataCacheMgr.removeCache(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
Expand All @@ -210,7 +219,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
if (metaCache != null) {
metaCache.invalidateTableCache(dbName, tblName);
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
hudiMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
Expand All @@ -229,7 +238,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
Expand All @@ -247,7 +256,7 @@ public void invalidateCatalogCache(long catalogId) {
if (metaCache != null) {
metaCache.invalidateAll();
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.hudi.source.HudiLocalEngineContext;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVBaseTableIf;
Expand Down Expand Up @@ -76,10 +75,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -127,9 +123,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
private HoodieTableMetaClient hudiClient = null;
private final byte[] hudiClientLock = new byte[0];

private volatile HoodieTableFileSystemView hudiFsView = null;
private final byte[] hudiFsViewLock = new byte[0];

static {
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
Expand Down Expand Up @@ -1043,21 +1036,4 @@ public HoodieTableMetaClient getHudiClient() {
return hudiClient;
}
}

public HoodieTableFileSystemView getHudiFsView(HoodieTableMetaClient hudiClient) {
if (hudiFsView != null) {
return hudiFsView;
}
synchronized (hudiFsViewLock) {
if (hudiFsView != null) {
return hudiFsView;
}
// If we need to cache fsView later,
// we can use `FileSystemViewManager.createViewManagerWithTableMetadata` to get view manager
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
HudiLocalEngineContext ctx = new HudiLocalEngineContext(hudiClient.getStorageConf());
hudiFsView = FileSystemViewManager.createInMemoryFileSystemView(ctx, hudiClient, metadataConfig);
return hudiFsView;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi.source;

import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.ExternalMetaCacheMgr;

import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;

public class HudiCachedFsViewProcessor {
private static final Logger LOG = LoggerFactory.getLogger(HudiCachedFsViewProcessor.class);
private final LoadingCache<FsViewKey, HoodieTableFileSystemView> fsViewCache;

public HudiCachedFsViewProcessor(ExecutorService executor) {
CacheFactory partitionCacheFactory = new CacheFactory(
OptionalLong.of(28800L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
Config.max_external_table_cache_num,
true,
null);
this.fsViewCache = partitionCacheFactory.buildCache(this::createFsView, null, executor);
}

private HoodieTableFileSystemView createFsView(FsViewKey key) {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
HudiLocalEngineContext ctx = new HudiLocalEngineContext(key.getClient().getStorageConf());
return FileSystemViewManager.createInMemoryFileSystemView(ctx, key.getClient(), metadataConfig);
}

public HoodieTableFileSystemView getFsView(String dbName, String tbName, HoodieTableMetaClient hudiClient) {
return fsViewCache.get(new FsViewKey(dbName, tbName, hudiClient));
}

public void cleanUp() {
fsViewCache.cleanUp();
}

public void invalidateAll() {
fsViewCache.invalidateAll();
}

public void invalidateDbCache(String dbName) {
fsViewCache.asMap().forEach((k, v) -> {
if (k.getDbName().equals(dbName)) {
fsViewCache.invalidate(k);
}
});
}

public void invalidateTableCache(String dbName, String tbName) {
fsViewCache.asMap().forEach((k, v) -> {
if (k.getDbName().equals(dbName) && k.getTbName().equals(tbName)) {
fsViewCache.invalidate(k);
}
});
}

private static class FsViewKey {
String dbName;
String tbName;
HoodieTableMetaClient client;

public FsViewKey(String dbName, String tbName, HoodieTableMetaClient client) {
this.dbName = dbName;
this.tbName = tbName;
this.client = client;
}

public String getDbName() {
return dbName;
}

public String getTbName() {
return tbName;
}

public HoodieTableMetaClient getClient() {
return client;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FsViewKey fsViewKey = (FsViewKey) o;
return Objects.equals(dbName, fsViewKey.dbName) && Objects.equals(tbName, fsViewKey.tbName)
&& Objects.equals(client.getBasePathV2(), fsViewKey.client.getBasePathV2());
}

@Override
public int hashCode() {
return Objects.hash(dbName, tbName, client.getBasePathV2());
}
}

public Map<String, Map<String, String>> getCacheStats() {
Map<String, Map<String, String>> res = Maps.newHashMap();
res.put("hudi_fs_view_cache",
ExternalMetaCacheMgr.getCacheStats(fsViewCache.stats(), fsViewCache.estimatedSize()));
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;

public class HudiPartitionMgr {
public class HudiMetadataCacheMgr {
private final Map<Long, HudiPartitionProcessor> partitionProcessors = Maps.newConcurrentMap();
private final Map<Long, HudiCachedFsViewProcessor> fsViewProcessors = Maps.newConcurrentMap();
private final ExecutorService executor;

public HudiPartitionMgr(ExecutorService executor) {
public HudiMetadataCacheMgr(ExecutorService executor) {
this.executor = executor;
}

Expand All @@ -43,31 +44,68 @@ public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) {
});
}

public void removePartitionProcessor(long catalogId) {
HudiPartitionProcessor processor = partitionProcessors.remove(catalogId);
if (processor != null) {
processor.cleanUp();
public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
return fsViewProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
if (catalog instanceof HMSExternalCatalog) {
return new HudiCachedFsViewProcessor(executor);
} else {
throw new RuntimeException("Hudi only supports hive(or compatible) catalog now");
}
});
}

public void removeCache(long catalogId) {
HudiPartitionProcessor partitionProcessor = partitionProcessors.remove(catalogId);
if (partitionProcessor != null) {
partitionProcessor.cleanUp();
}
HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.remove(catalogId);
if (fsViewProcessor != null) {
fsViewProcessor.cleanUp();
}
}

public void cleanPartitionProcess(long catalogId) {
public void invalidateCatalogCache(long catalogId) {
HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
if (processor != null) {
processor.cleanUp();
}
HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.remove(catalogId);
if (fsViewProcessor != null) {
fsViewProcessor.invalidateAll();
}
}

public void cleanDatabasePartitions(long catalogId, String dbName) {
public void invalidateDbCache(long catalogId, String dbName) {
HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
if (processor != null) {
processor.cleanDatabasePartitions(dbName);
}
HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.remove(catalogId);
if (fsViewProcessor != null) {
fsViewProcessor.invalidateDbCache(dbName);
}
}

public void cleanTablePartitions(long catalogId, String dbName, String tblName) {
public void invalidateTableCache(long catalogId, String dbName, String tblName) {
HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
if (processor != null) {
processor.cleanTablePartitions(dbName, tblName);
}
HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.remove(catalogId);
if (fsViewProcessor != null) {
fsViewProcessor.invalidateTableCache(dbName, tblName);
}
}

public Map<String, Map<String, String>> getCacheStats(ExternalCatalog catalog) {
Map<String, Map<String, String>> res = Maps.newHashMap();

HudiCachedPartitionProcessor partitionProcessor = (HudiCachedPartitionProcessor) getPartitionProcessor(catalog);
res.putAll(partitionProcessor.getCacheStats());

HudiCachedFsViewProcessor fsViewProcessor = getFsViewProcessor(catalog);
res.putAll(fsViewProcessor.getCacheStats());
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ protected void doInitialize() throws UserException {
queryInstant = snapshotInstant.get().getTimestamp();
snapshotTimestamp = Option.empty();
}
fsView = hmsTable.getHudiFsView(hudiClient);
fsView = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getFsViewProcessor(hmsTable.getCatalog())
.getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
Expand Down Expand Up @@ -1330,9 +1330,8 @@ private static TFetchSchemaTableDataResult metaCacheStatsMetadataResult(TSchemaT
fillBatch(dataBatch, cache.getStats(), catalog.getName());
}
// 2. hudi cache
HudiCachedPartitionProcessor processor
= (HudiCachedPartitionProcessor) mgr.getHudiPartitionProcess(catalog);
fillBatch(dataBatch, processor.getCacheStats(), catalog.getName());
HudiMetadataCacheMgr hudiMetadataCacheMgr = mgr.getHudiMetadataCacheMgr();
fillBatch(dataBatch, hudiMetadataCacheMgr.getCacheStats(catalog), catalog.getName());
} else if (catalogIf instanceof IcebergExternalCatalog) {
// 3. iceberg cache
IcebergMetadataCache icebergCache = mgr.getIcebergMetadataCache();
Expand Down

0 comments on commit d74fea1

Please sign in to comment.