Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3337] improvement(hadoop-catalog): Support user impersonation for Hadoop catalog. #3634

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies {
testImplementation(project(":server"))
testImplementation(project(":server-common"))

testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)

testImplementation(libs.bundles.log4j)
testImplementation(libs.mockito.core)
testImplementation(libs.mysql.driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.file.FilesetCatalog;
import com.datastrato.gravitino.rel.SupportsSchemas;
import java.util.Map;
import java.util.Optional;

/**
* Hadoop catalog is a fileset catalog that can manage filesets on the Hadoop Compatible File
Expand Down Expand Up @@ -43,4 +46,12 @@ public SupportsSchemas asSchemas() {
public FilesetCatalog asFilesetCatalog() {
return (HadoopCatalogOperations) ops();
}

protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) {
boolean impersonationEnabled = new KerberosConfig(config).isImpersonationEnabled();
if (!impersonationEnabled) {
return Optional.empty();
}
return Optional.of(new HadoopProxyPlugin());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
package com.datastrato.gravitino.catalog.hadoop;

import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosClient;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.PropertiesMetadata;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.FilesetAlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
Expand All @@ -36,6 +40,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -47,6 +52,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,7 +78,15 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

@VisibleForTesting Optional<Path> catalogStorageLocation;

// For testing only.
private Map<String, String> conf;

@SuppressWarnings("unused")
private ProxyPlugin proxyPlugin;

private String kerberosRealm;

private CatalogInfo catalogInfo;

HadoopCatalogOperations(EntityStore store) {
this.store = store;
}
Expand All @@ -81,10 +95,16 @@ public HadoopCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}

public String getKerberosRealm() {
return kerberosRealm;
}

@Override
public void initialize(Map<String, String> config, CatalogInfo info) throws RuntimeException {
// Initialize Hadoop Configuration.
this.conf = config;
this.hadoopConf = new Configuration();
this.catalogInfo = info;
Map<String, String> bypassConfigs =
config.entrySet().stream()
.filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
Expand All @@ -98,9 +118,31 @@ public void initialize(Map<String, String> config, CatalogInfo info) throws Runt
(String)
CATALOG_PROPERTIES_METADATA.getOrDefault(
config, HadoopCatalogPropertiesMetadata.LOCATION);
conf.forEach(hadoopConf::set);

initAuthentication(conf, hadoopConf);

this.catalogStorageLocation = Optional.ofNullable(catalogLocation).map(Path::new);
}

private void initAuthentication(Map<String, String> conf, Configuration hadoopConf) {
AuthenticationConfig config = new AuthenticationConfig(conf);
boolean enableAuth = config.isEnableAuth();
String authType = config.getAuthType();

if (enableAuth && StringUtils.equalsIgnoreCase(authType, "kerberos")) {
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(hadoopConf);
try {
KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf);
File keytabFile = kerberosClient.saveKeyTabFileFromUri(catalogInfo.id());
this.kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException("Failed to login with kerberos", e);
}
}
}

@Override
public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaException {
try {
Expand Down Expand Up @@ -609,4 +651,8 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) {
this.proxyPlugin = hadoopProxyPlugin;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
Expand All @@ -29,6 +31,9 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
null,
false /* hidden */))
.putAll(BASIC_CATALOG_PROPERTY_ENTRIES)
// The following two are about authentication.
.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.utils.Executable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;

public class HadoopProxyPlugin implements ProxyPlugin {
private HadoopCatalogOperations ops;
private UserGroupInformation realUser;

public HadoopProxyPlugin() {
try {
realUser = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
throw new IllegalStateException("Fail to init HadoopCatalogProxyPlugin");
}
}

@Override
public Object doAs(
Principal principal, Executable<Object, Exception> action, Map<String, String> properties)
throws Throwable {
try {
UserGroupInformation proxyUser;

if (UserGroupInformation.isSecurityEnabled() && ops != null) {
// The Gravitino server may use multiple KDC servers.
// The http authentication use one KDC server, the Hadoop catalog may use another KDC
// server.
// The KerberosAuthenticator will remove realm of principal.
// And then we add the realm of Hadoop catalog to the user.
String proxyKerberosPrincipalName = principal.getName();
if (!proxyKerberosPrincipalName.contains("@")) {
proxyKerberosPrincipalName =
String.format("%s@%s", proxyKerberosPrincipalName, ops.getKerberosRealm());
}

proxyUser = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, realUser);
} else {
proxyUser = UserGroupInformation.createProxyUser(principal.getName(), realUser);
}

return proxyUser.doAs((PrivilegedExceptionAction<Object>) action::execute);
} catch (UndeclaredThrowableException e) {
Throwable innerException = e.getCause();
if (innerException instanceof PrivilegedActionException) {
throw innerException.getCause();
} else if (innerException instanceof InvocationTargetException) {
throw innerException.getCause();
} else {
throw innerException;
}
}
}

@Override
public void bindCatalogOperation(CatalogOperations ops) {
this.ops = ((HadoopCatalogOperations) ops);
this.ops.setProxyPlugin(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.hadoop.kerberos;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.config.ConfigBuilder;
import com.datastrato.gravitino.config.ConfigConstants;
import com.datastrato.gravitino.config.ConfigEntry;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
import java.util.Map;

public class AuthenticationConfig extends Config {
public static final String ENABLE_AUTH_KEY = "authentication.enable";
public static final String AUTH_TYPE_KEY = "authentication.type";

public AuthenticationConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
}

public static final ConfigEntry<Boolean> ENABLE_AUTH_ENTRY =
new ConfigBuilder(ENABLE_AUTH_KEY)
.doc("Whether to enable authentication for Hadoop catalog")
.version(ConfigConstants.VERSION_0_5_1)
.booleanConf()
.createWithDefault(false);

public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
new ConfigBuilder(AUTH_TYPE_KEY)
.doc("The type of authentication for Hadoop catalog, currently we only support kerberos")
.version(ConfigConstants.VERSION_0_5_1)
.stringConf()
.create();

public boolean isEnableAuth() {
return get(ENABLE_AUTH_ENTRY);
}

public String getAuthType() {
return get(AUTH_TYPE_ENTRY);
}

public static final Map<String, PropertyEntry<?>> AUTHENTICATION_PROPERTY_ENTRIES =
new ImmutableMap.Builder<String, PropertyEntry<?>>()
.put(
ENABLE_AUTH_KEY,
PropertyEntry.booleanPropertyEntry(
ENABLE_AUTH_KEY,
"Whether to enable authentication for Hadoop catalog",
false,
true,
false,
false,
false))
.put(
AUTH_TYPE_KEY,
PropertyEntry.stringImmutablePropertyEntry(
AUTH_TYPE_KEY,
"The type of authentication for Hadoop catalog, currently we only support kerberos",
false,
null,
false,
false))
.build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.hadoop.kerberos;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FetchFileUtils {

private FetchFileUtils() {}

public static void fetchFileFromUri(
String fileUri, File destFile, int timeout, Configuration conf) throws IOException {
try {
URI uri = new URI(fileUri);
String scheme = Optional.ofNullable(uri.getScheme()).orElse("file");

switch (scheme) {
case "http":
case "https":
case "ftp":
FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000);
break;

case "file":
Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath());
break;

case "hdfs":
FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI()));
break;

default:
throw new IllegalArgumentException(
String.format("Doesn't support the scheme %s", scheme));
}
} catch (URISyntaxException ue) {
throw new IllegalArgumentException("The uri of file has the wrong format", ue);
}
}
}
Loading
Loading