Skip to content

Commit

Permalink
[#596] feat(hive): Hive catalog supports to impersonate users to exec…
Browse files Browse the repository at this point in the history
…ute operations in simple mode. (#1450)

### What changes were proposed in this pull request?
Hive catalog supports to impersonate users to execute operations in
simple mode.
For Kerberos mode, I have created an new issue. I will finish it in the
later pull request.
We use a Hive client cache pool referring to the Iceberg cache pool. We
use user name as the key of cache pool.
### Why are the changes needed?

Fix: #596 

### Does this PR introduce _any_ user-facing change?
Yes, we will add a new document.

### How was this patch tested?
Add a new IT

---------

Co-authored-by: Heng Qin <[email protected]>
  • Loading branch information
qqqttt123 and Heng Qin authored Jan 24, 2024
1 parent 411e8b9 commit c045c31
Show file tree
Hide file tree
Showing 19 changed files with 712 additions and 34 deletions.
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,15 @@

Apache Iceberg
./api/src/main/java/com/datastrato/gravitino/exceptions/RESTException.java
./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/CachedClientPool.java
./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveClientPool.java
./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/dyn/DynConstructors.java
./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/dyn/DynFields.java
./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/dyn/DynMethods.java
./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/MiniHiveMetastore.java
./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/MiniHiveMetastoreService.java
./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/ScriptRunner.java
./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestCachedClientPool.java
./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/IcebergExceptionMapper.java
./clients/client-java/src/main/java/com/datastrato/gravitino/client/HTTPClient.java
./clients/client-java/src/main/java/com/datastrato/gravitino/client/RESTClient.java
Expand Down
1 change: 1 addition & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@
Okio
J2ObjC
SQLite JDBC Driver
Immutables

This product bundles various third-party components also under the
Apache Software Foundation License 1.1
Expand Down
4 changes: 4 additions & 0 deletions catalogs/catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ dependencies {
compileOnly(libs.lombok)
annotationProcessor(libs.lombok)

compileOnly(libs.immutables.value)
annotationProcessor(libs.immutables.value)

implementation(libs.hive2.metastore) {
exclude("org.apache.hbase")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
Expand Down Expand Up @@ -68,6 +71,7 @@ dependencies {

implementation(libs.slf4j.api)
implementation(libs.guava)
implementation(libs.caffeine)

testImplementation(libs.junit.jupiter.api)
testRuntimeOnly(libs.junit.jupiter.engine)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datastrato.gravitino.catalog.hive;

import com.datastrato.gravitino.utils.ClientPool;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.immutables.value.Value;

/**
* Referred from Apache Iceberg's CachedClientPool implementation
* hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
*
* <p>ClientPoolCache is used for every HiveCatalog, I changed the type of `clientPoolCache` from
* static variable to variable. I change cache key from user and configuration options to the
* username.
*
* <p>A ClientPool that caches the underlying HiveClientPool instances.
*/
public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> {

private final Cache<Key, HiveClientPool> clientPoolCache;

private final Configuration conf;
private final int clientPoolSize;

CachedClientPool(int clientPoolSize, Configuration conf, long evictionInterval) {
this.conf = conf;
this.clientPoolSize = clientPoolSize;
// Since Caffeine does not ensure that removalListener will be involved after expiration
// We use a scheduler with one thread to clean up expired clients.
this.clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((ignored, value, cause) -> ((HiveClientPool) value).close())
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(1, newDaemonThreadFactory())))
.build();
}

@VisibleForTesting
HiveClientPool clientPool() {
Key key = extractKey();
return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf));
}

@VisibleForTesting
Cache<Key, HiveClientPool> clientPoolCache() {
return clientPoolCache;
}

@Override
public <R> R run(Action<R, IMetaStoreClient, TException> action)
throws TException, InterruptedException {
return clientPool().run(action);
}

@Override
public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
throws TException, InterruptedException {
return clientPool().run(action, retry);
}

@VisibleForTesting
static Key extractKey() {
List<Object> elements = Lists.newArrayList();
try {
elements.add(UserGroupInformation.getCurrentUser().getUserName());
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return Key.of(elements);
}

@Value.Immutable
abstract static class Key {

abstract List<Object> elements();

private static Key of(Iterable<?> elements) {
return ImmutableKey.builder().elements(elements).build();
}
}

@Value.Immutable
abstract static class ConfElement {
abstract String key();

@Nullable
abstract String value();

static ConfElement of(String key, String value) {
return ImmutableConfElement.builder().key(key).value(value).build();
}
}

private static ThreadFactory newDaemonThreadFactory() {
return new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("hive-metastore-cleaner" + "-%d")
.build();
}

public void close() {
clientPoolCache.invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.ProxyPlugin;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
import java.util.Optional;

/** Implementation of a Hive catalog in Gravitino. */
public class HiveCatalog extends BaseCatalog<HiveCatalog> {
Expand Down Expand Up @@ -43,7 +45,7 @@ protected CatalogOperations newOps(Map<String, String> config) {
*/
@Override
public SupportsSchemas asSchemas() {
return (HiveCatalogOperations) ops();
return (SupportsSchemas) ops();
}

/**
Expand All @@ -53,6 +55,18 @@ public SupportsSchemas asSchemas() {
*/
@Override
public TableCatalog asTableCatalog() {
return (HiveCatalogOperations) ops();
return (TableCatalog) ops();
}

@Override
protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) {
boolean impersonationEnabled =
(boolean)
new HiveCatalogPropertiesMeta()
.getOrDefault(config, HiveCatalogPropertiesMeta.IMPERSONATION_ENABLE);
if (!impersonationEnabled) {
return Optional.empty();
}
return Optional.of(new HiveProxyPlugin());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog.hive;

import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS;
import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.CLIENT_POOL_SIZE;
import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS;
import static com.datastrato.gravitino.catalog.hive.HiveTable.SUPPORT_TABLE_TYPES;
Expand Down Expand Up @@ -44,7 +45,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas

public static final Logger LOG = LoggerFactory.getLogger(HiveCatalogOperations.class);

@VisibleForTesting HiveClientPool clientPool;
@VisibleForTesting CachedClientPool clientPool;

@VisibleForTesting HiveConf hiveConf;

Expand Down Expand Up @@ -136,14 +136,20 @@ public void initialize(Map<String, String> conf) throws RuntimeException {
mergeConfig.forEach(hadoopConf::set);
hiveConf = new HiveConf(hadoopConf, HiveCatalogOperations.class);

this.clientPool = new HiveClientPool(getClientPoolSize(conf), hiveConf);
this.clientPool =
new CachedClientPool(getClientPoolSize(conf), hiveConf, getCacheEvictionInterval(conf));
}

@VisibleForTesting
int getClientPoolSize(Map<String, String> conf) {
return (int) catalogPropertiesMetadata.getOrDefault(conf, CLIENT_POOL_SIZE);
}

long getCacheEvictionInterval(Map<String, String> conf) {
return (long)
catalogPropertiesMetadata.getOrDefault(conf, CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
}

/** Closes the Hive catalog and releases the associated client pool. */
@Override
public void close() {
Expand Down Expand Up @@ -206,7 +212,7 @@ public HiveSchema createSchema(
.withConf(hiveConf)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreator(UserGroupInformation.getCurrentUser().getUserName())
.withCreateTime(Instant.now())
.build())
.build();
Expand Down Expand Up @@ -603,7 +609,7 @@ public Table createTable(
.withSortOrders(sortOrders)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreator(UserGroupInformation.getCurrentUser().getUserName())
.withCreateTime(Instant.now())
.build())
.withPartitioning(partitioning)
Expand Down Expand Up @@ -941,23 +947,6 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo
}
}

// TODO. We should figure out a better way to get the current user from servlet container.
private static String currentUser() {
String username = null;
try {
username = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
LOG.warn("Failed to get Hadoop user", e);
}

if (username != null) {
return username;
} else {
LOG.warn("Hadoop user is null, defaulting to user.name");
return System.getProperty("user.name");
}
}

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return tablePropertiesMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastrato.gravitino.catalog.PropertyEntry;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class HiveCatalogPropertiesMeta extends BaseCatalogPropertiesMetadata {

Expand All @@ -17,6 +18,16 @@ public class HiveCatalogPropertiesMeta extends BaseCatalogPropertiesMetadata {

public static final String METASTORE_URIS = "metastore.uris";

public static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
"client.pool-cache.eviction-interval-ms";

public static final long DEFAULT_CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
TimeUnit.MINUTES.toMillis(5);

public static final String IMPERSONATION_ENABLE = "impersonation-enable";

public static final boolean DEFAULT_IMPERSONATION_ENABLE = false;

private static final Map<String, PropertyEntry<?>> HIVE_CATALOG_PROPERTY_ENTRIES =
ImmutableMap.<String, PropertyEntry<?>>builder()
.put(
Expand All @@ -31,6 +42,24 @@ public class HiveCatalogPropertiesMeta extends BaseCatalogPropertiesMetadata {
true,
DEFAULT_CLIENT_POOL_SIZE,
false))
.put(
CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
PropertyEntry.longOptionalPropertyEntry(
CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
"The cache pool eviction interval",
true,
DEFAULT_CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
false))
.put(
IMPERSONATION_ENABLE,
PropertyEntry.booleanPropertyEntry(
IMPERSONATION_ENABLE,
"Enable user impersonation for Hive catalog",
false,
true,
DEFAULT_IMPERSONATION_ENABLE,
false,
false))
.putAll(BASIC_CATALOG_PROPERTY_ENTRIES)
.build();

Expand Down
Loading

0 comments on commit c045c31

Please sign in to comment.