-
Notifications
You must be signed in to change notification settings - Fork 392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#596] feat(hive): Hive catalog supports to impersonate users to execute operations in simple mode. #1450
Conversation
@BeforeAll | ||
public static void startIntegrationTest() throws Exception { | ||
originHadoopUser = System.getenv(HADOOP_USER_NAME); | ||
setEnv(HADOOP_USER_NAME, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove HADOOP_USER_NAME, otherwise we will still use HADOOP_USER_NAME when we use unsecured mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the unsecured mode and does the user know about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the unsecured mode and does the user know about it?
It's the concept of Hadoop. Unsecured mode is the simple mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hard-coded implement in the HiveClient actually.
@mchades @jerryshao @yuqi1129 Could you help me review this pr? |
@FANNG1 @jerryshao @yuqi1129 @diqiu50 @Clearvive Could you help me review this pr? |
* | ||
* <p>A ClientPool that caches the underlying HiveClientPool instances. | ||
*/ | ||
public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between CachedClientPool
and HiveClientPool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can see the comment A ClientPool that caches the underlying HiveClientPool instances
.
"client.pool.cache.eviction-interval-ms"; | ||
|
||
public static final long DEFAULT_CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS = | ||
TimeUnit.MINUTES.toMillis(5);; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra comma here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
static Key extractKey() { | ||
List<Object> elements = Lists.newArrayList(); | ||
try { | ||
elements.add(UserGroupInformation.getCurrentUser().getUserName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why the Key contains multiple user names. Shouldn't each user have their own client pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elements contain one user name. Why do we use elements. It's because Iceberg uses elements. There are other config options in the elements. For Gravitino, our cached keys are only username. I reserve the elements. Because it's easy to extend other cache keys.
import java.lang.reflect.Proxy; | ||
import java.util.Collections; | ||
|
||
public class CatalogOperationsProxy implements InvocationHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should include a Java doc on how you use the proxy mechanism here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
import java.security.Principal; | ||
import java.util.Map; | ||
|
||
public interface CatalogProxyPlugin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class java doc and method java doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
public interface CatalogProxyPlugin { | ||
Object doAs( | ||
Principal principal, Executable<Object, Exception> action, Map<String, String> properties) | ||
throws Throwable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a more detailed exception class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. We can't.
docs/apache-hive-catalog.md
Outdated
| `metastore.uris` | The Hive metastore service URIs, separate multiple addresses with commas. Such as `thrift://127.0.0.1:9083` | (none) | Yes | 0.2.0 | | ||
| `client.pool-size` | The maximum number of Hive metastore clients in the pool for Gravitino. | 1 | No | 0.2.0 | | ||
| `gravitino.bypass.` | Property name with this prefix passed down to the underlying HMS client for use. Such as `gravitino.bypass.hive.metastore.failure.retries = 3` indicate 3 times of retries upon failure of Thrift metastore calls | (none) | No | 0.2.0 | | ||
| `client.pool.cache.eviction-interval-ms` | The cache pool eviction interval. | | No | 0.4.0 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value of client.pool.cache.eviction-interval-ms
is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} else if (innerException instanceof InvocationTargetException) { | ||
throw innerException.getCause(); | ||
} else { | ||
throw e.getCause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw innerException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
return CatalogOperationsProxy.getProxy(ops, plugin); | ||
} | ||
|
||
protected CatalogProxyPlugin newProxyPlugin(Map<String, String> config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make CatalogProxyPlugin
invisible to detailed catalogs? Making it as a method of catalog seems too invasive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every catalog should be proxied actually. So I choose to move CatalogProxyPlugin to here.
@@ -17,6 +17,9 @@ dependencies { | |||
compileOnly(libs.lombok) | |||
annotationProcessor(libs.lombok) | |||
|
|||
compileOnly(libs.immutables.value) | |||
annotationProcessor(libs.immutables.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you should also update license.bin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -87,7 +87,13 @@ public CatalogOperations ops() { | |||
if (ops == null) { | |||
Preconditions.checkArgument( | |||
entity != null && conf != null, "entity and conf must be set before calling ops()"); | |||
ops = newOps(conf); | |||
CatalogOperations newOps = newOps(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the partition API is merged, how do you handle that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we will have TableOperations
. We will create proxy object for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have to figure out a solution with @mchades ASAP before the interface is settle down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think twice. If the type of value is Table, I return a proxy object, too.
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object object = plugin.doAs(
PrincipalUtils.getCurrentPrincipal(),
() -> method.invoke(ops, args),
Collections.emptyMap());
if (object instanceof Table) {
return createProxy(object, plugin);
}
return object;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recover the code after discussion with @mchades if we have TableOperation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have other pull requests relying on this pr. Maybe I can change this place to adapt @mchades 's solution in later pull requests.
} | ||
|
||
protected ProxyPlugin newProxyPlugin(Map<String, String> config) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better using optional to avoid null check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
this.ops = ops; | ||
} | ||
|
||
public static <T> T getProxy(T ops, ProxyPlugin plugin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to rename to "createProxy"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
.withDataType(Types.StringType.get()) | ||
.withComment("col_3_comment") | ||
.build(); | ||
return new ColumnDTO[] {col1, col2, col3}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change all the ColumnDTO to use ColumnImpl API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
private static <T> T createProxyInternal(T ops, ProxyPlugin plugin, Class<?>[] interfaces) { | ||
return (T) | ||
Proxy.newProxyInstance( | ||
ops.getClass().getClassLoader(), interfaces, new OperationsProxy(plugin, ops)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested this feature in the deployed Gravitino, I'm not sure if there's a classloader issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have written an IT ProxyCatalogHiveIT
to test this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not enough to test using mini gravitino, you should verify it manually in your local environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verified with the playground.
anotherSchemaName.toLowerCase())); | ||
Assertions.assertThrows( | ||
RuntimeException.class, | ||
() -> anotherCatalog.asSchemas().createSchema(anotherIdent, comment, properties)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please verify the exception message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Database db = hiveClientPool.run(client -> client.getDatabase(schemaName)); | ||
Assertions.assertEquals(EXPECT_USER, db.getOwnerName()); | ||
Assertions.assertEquals( | ||
EXPECT_USER, hdfs.getFileStatus(new Path(db.getLocationUri())).getOwner()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default user for HDFS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
root is the default user for HDFS.
columns, | ||
comment, | ||
ImmutableMap.of(), | ||
Partitioning.EMPTY_PARTITIONING)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add necessary white space and comments to the code blocks. You put everything together without any ws and comments, which is hard to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
clientPool.close(); | ||
clientPool = null; | ||
} | ||
// We used a cached client pool, the pool will close clients after expiration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to close the all the dangling clients immediately when catalog is closed, otherwise it will be leaked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just left one comment, please think about it @qqqttt123
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
What changes were proposed in this pull request?
Hive catalog supports to impersonate users to execute operations in simple mode.
For Kerberos mode, I have created an new issue. I will finish it in the later pull request.
We use a Hive client cache pool referring to the Iceberg cache pool. We use user name as the key of cache pool.
Why are the changes needed?
Fix: #596
Does this PR introduce any user-facing change?
Yes, we will add a new document.
How was this patch tested?
Add a new IT