-
Notifications
You must be signed in to change notification settings - Fork 395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#596] feat(hive): Hive catalog supports to impersonate users to execute operations in simple mode. #1450
[#596] feat(hive): Hive catalog supports to impersonate users to execute operations in simple mode. #1450
Changes from all commits
b5e44c2
ef17af3
1020290
312c808
b2ec506
e07f7be
8c867ca
0d206e8
eb4f93d
ca5851b
0b4d45c
0b9294b
479cdee
78570b5
15727d7
8ed95aa
9a0c641
8e050c5
e8c5e19
d4d6f89
aa630e4
63d1368
8f3c6fe
0994a5a
0b0ba2e
23c55bb
5a0c0de
970898f
2255deb
0180884
393f69e
a35bf29
74d6ed1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package com.datastrato.gravitino.catalog.hive; | ||
|
||
import com.datastrato.gravitino.utils.ClientPool; | ||
import com.github.benmanes.caffeine.cache.Cache; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.Scheduler; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.Lists; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.List; | ||
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
import javax.annotation.Nullable; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.hive.metastore.IMetaStoreClient; | ||
import org.apache.hadoop.security.UserGroupInformation; | ||
import org.apache.thrift.TException; | ||
import org.immutables.value.Value; | ||
|
||
/** | ||
* Referred from Apache Iceberg's CachedClientPool implementation | ||
* hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java | ||
* | ||
* <p>ClientPoolCache is used for every HiveCatalog, I changed the type of `clientPoolCache` from | ||
* static variable to variable. I change cache key from user and configuration options to the | ||
* username. | ||
* | ||
* <p>A ClientPool that caches the underlying HiveClientPool instances. | ||
*/ | ||
public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can see the comment |
||
|
||
private final Cache<Key, HiveClientPool> clientPoolCache; | ||
|
||
private final Configuration conf; | ||
private final int clientPoolSize; | ||
|
||
CachedClientPool(int clientPoolSize, Configuration conf, long evictionInterval) { | ||
this.conf = conf; | ||
this.clientPoolSize = clientPoolSize; | ||
// Since Caffeine does not ensure that removalListener will be involved after expiration | ||
// We use a scheduler with one thread to clean up expired clients. | ||
this.clientPoolCache = | ||
Caffeine.newBuilder() | ||
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) | ||
.removalListener((ignored, value, cause) -> ((HiveClientPool) value).close()) | ||
.scheduler( | ||
Scheduler.forScheduledExecutorService( | ||
new ScheduledThreadPoolExecutor(1, newDaemonThreadFactory()))) | ||
.build(); | ||
} | ||
|
||
@VisibleForTesting | ||
HiveClientPool clientPool() { | ||
Key key = extractKey(); | ||
return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf)); | ||
} | ||
|
||
@VisibleForTesting | ||
Cache<Key, HiveClientPool> clientPoolCache() { | ||
return clientPoolCache; | ||
} | ||
|
||
@Override | ||
public <R> R run(Action<R, IMetaStoreClient, TException> action) | ||
throws TException, InterruptedException { | ||
return clientPool().run(action); | ||
} | ||
|
||
@Override | ||
public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry) | ||
throws TException, InterruptedException { | ||
return clientPool().run(action, retry); | ||
} | ||
|
||
@VisibleForTesting | ||
static Key extractKey() { | ||
List<Object> elements = Lists.newArrayList(); | ||
try { | ||
elements.add(UserGroupInformation.getCurrentUser().getUserName()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why the Key contains multiple user names. Shouldn't each user have their own client pool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Elements contain one user name. Why do we use elements. It's because Iceberg uses elements. There are other config options in the elements. For Gravitino, our cached keys are only username. I reserve the elements. Because it's easy to extend other cache keys. |
||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
|
||
return Key.of(elements); | ||
} | ||
|
||
@Value.Immutable | ||
abstract static class Key { | ||
|
||
abstract List<Object> elements(); | ||
|
||
private static Key of(Iterable<?> elements) { | ||
return ImmutableKey.builder().elements(elements).build(); | ||
} | ||
} | ||
|
||
@Value.Immutable | ||
abstract static class ConfElement { | ||
abstract String key(); | ||
|
||
@Nullable | ||
abstract String value(); | ||
|
||
static ConfElement of(String key, String value) { | ||
return ImmutableConfElement.builder().key(key).value(value).build(); | ||
} | ||
} | ||
|
||
private static ThreadFactory newDaemonThreadFactory() { | ||
return new ThreadFactoryBuilder() | ||
.setDaemon(true) | ||
.setNameFormat("hive-metastore-cleaner" + "-%d") | ||
.build(); | ||
} | ||
|
||
public void close() { | ||
clientPoolCache.invalidateAll(); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add some comments in this file about which part is modified by you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,9 +6,11 @@ | |
|
||
import com.datastrato.gravitino.catalog.BaseCatalog; | ||
import com.datastrato.gravitino.catalog.CatalogOperations; | ||
import com.datastrato.gravitino.catalog.ProxyPlugin; | ||
import com.datastrato.gravitino.rel.SupportsSchemas; | ||
import com.datastrato.gravitino.rel.TableCatalog; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
/** Implementation of a Hive catalog in Gravitino. */ | ||
public class HiveCatalog extends BaseCatalog<HiveCatalog> { | ||
|
@@ -43,7 +45,7 @@ protected CatalogOperations newOps(Map<String, String> config) { | |
*/ | ||
@Override | ||
public SupportsSchemas asSchemas() { | ||
return (HiveCatalogOperations) ops(); | ||
return (SupportsSchemas) ops(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to change this, seems not so necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we use the dynamic proxy. The dynmiac proxy can only be used for interfaces. |
||
} | ||
|
||
/** | ||
|
@@ -53,6 +55,18 @@ public SupportsSchemas asSchemas() { | |
*/ | ||
@Override | ||
public TableCatalog asTableCatalog() { | ||
return (HiveCatalogOperations) ops(); | ||
return (TableCatalog) ops(); | ||
} | ||
|
||
@Override | ||
protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) { | ||
boolean impersonationEnabled = | ||
(boolean) | ||
new HiveCatalogPropertiesMeta() | ||
.getOrDefault(config, HiveCatalogPropertiesMeta.IMPERSONATION_ENABLE); | ||
if (!impersonationEnabled) { | ||
return Optional.empty(); | ||
} | ||
return Optional.of(new HiveProxyPlugin()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you should also update license.bin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.