Skip to content

Commit

Permalink
some rename
Browse files Browse the repository at this point in the history
  • Loading branch information
meiyi authored and mymeiyi committed May 30, 2019
1 parent a0df2e4 commit eac82c0
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -56,8 +57,8 @@
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.HDFSAclHelper.PathHelper;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,10 +95,10 @@
*/
@CoreCoprocessor
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class HDFSAclController implements MasterCoprocessor, MasterObserver {
private static final Logger LOG = LoggerFactory.getLogger(HDFSAclController.class);
public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);

private HDFSAclHelper hdfsAclHelper = null;
private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
private PathHelper pathHelper = null;
private FileSystem fs = null;
/** Provider for mapping principal names to Users */
Expand All @@ -117,8 +118,8 @@ public void preMasterInitialization(final ObserverContext<MasterCoprocessorEnvir
throw new IOException("Does not implement HMasterServices");
}
MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
hdfsAclHelper =
new HDFSAclHelper(masterServices.getConfiguration(), masterServices.getConnection());
hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
masterServices.getConnection());
pathHelper = hdfsAclHelper.getPathHelper();
fs = pathHelper.getFileSystem();
hdfsAclHelper.setCommonDirPermission();
Expand All @@ -132,14 +133,20 @@ public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
// check if hbase:acl table has 'm' CF
TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies())
.anyMatch(family -> Bytes.equals(family.getName(), HDFSAclStorage.HDFS_ACL_FAMILY));
boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(
family -> Bytes.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
if (!containHdfsAclFamily) {
TableDescriptorBuilder builder =
TableDescriptorBuilder.newBuilder(tableDescriptor).setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(HDFSAclStorage.HDFS_ACL_FAMILY).build());
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
admin.modifyTable(builder.build());
}
} else {
LOG.error("Table {} is not yet created. {} should be configured after the {} Coprocessor",
PermissionStorage.ACL_TABLE_NAME, getClass().getSimpleName(),
AccessController.class.getSimpleName());
throw new TableNotFoundException(
"Table " + PermissionStorage.ACL_TABLE_NAME + " is not yet created");
}
}
this.userProvider = UserProvider.instantiate(ctx.getEnvironment().getConfiguration());
Expand Down Expand Up @@ -171,7 +178,7 @@ public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvi
hdfsAclHelper.addTableAcl(desc.getTableName(), owner);
try (Table aclTable =
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
HDFSAclStorage.addUserTableHdfsAcl(aclTable, owner, desc.getTableName());
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, owner, desc.getTableName());
}
}
}
Expand Down Expand Up @@ -217,10 +224,10 @@ public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<String> removeUsers = new HashSet<>();
try (Table aclTable =
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
List<String> users = HDFSAclStorage.getTableUsers(aclTable, tableName);
HDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
List<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
for (String user : users) {
List<byte[]> userEntries = HDFSAclStorage.getUserEntries(aclTable, user);
List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
boolean remove = true;
for (byte[] entry : userEntries) {
if (PermissionStorage.isGlobalEntry(entry)) {
Expand Down Expand Up @@ -253,7 +260,7 @@ public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ct
if (hdfsAclHelper != null) {
try (Table aclTable =
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
HDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, namespace);
SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, namespace);
}
Path tmpNsDir = pathHelper.getTmpNsDir(namespace);
if (fs.exists(tmpNsDir)) {
Expand All @@ -279,7 +286,8 @@ public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
case GLOBAL:
UserPermission perm = getUserGlobalPermission(conf, userName);
if (containReadPermission(perm)) {
List<byte[]> userEntries = HDFSAclStorage.getUserEntries(aclTable, userName);
List<byte[]> userEntries =
SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
Set<String> skipNamespaces = new HashSet<>();
Set<TableName> skipTables = new HashSet<>();
for (byte[] entry : userEntries) {
Expand All @@ -290,7 +298,7 @@ public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
}
}
hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
HDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
} else {
revokeUserGlobalPermission(aclTable, userName, userPermission);
}
Expand All @@ -300,8 +308,9 @@ public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
((NamespacePermission) userPermission.getPermission()).getNamespace();
UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
if (containReadPermission(nsPerm)) {
if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
List<byte[]> userEntries = HDFSAclStorage.getUserEntries(aclTable, userName);
if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
List<byte[]> userEntries =
SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
Set<TableName> skipTables = new HashSet<>();
for (byte[] entry : userEntries) {
if (!PermissionStorage.isNamespaceEntry(entry)
Expand All @@ -311,7 +320,7 @@ public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
}
hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
}
HDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
} else {
revokeUserNamespacePermission(aclTable, userName, namespace, userPermission);
}
Expand All @@ -324,11 +333,12 @@ public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
break;
}
if (containReadPermission(tPerm)) {
if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName) && !HDFSAclStorage
.hasUserNamespaceHdfsAcl(aclTable, userName, tableName.getNamespaceAsString())) {
if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
&& !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
tableName.getNamespaceAsString())) {
hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
}
HDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
} else {
revokeUserTablePermission(aclTable, userName, tableName, userPermission);
}
Expand Down Expand Up @@ -378,7 +388,7 @@ private void revokeUserGlobalPermission(Table aclTable, String userName,
// remove user global acls but reserve ns and table acls
Set<String> skipNamespaces = new HashSet<>();
Set<TableName> skipTables = new HashSet<>();
List<byte[]> userEntries = HDFSAclStorage.getUserEntries(aclTable, userName);
List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
for (byte[] entry : userEntries) {
if (PermissionStorage.isNamespaceEntry(entry)) {
skipNamespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
Expand All @@ -393,33 +403,34 @@ private void revokeUserGlobalPermission(Table aclTable, String userName,
}
}
hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, filterTableNames);
HDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
}

private void revokeUserNamespacePermission(Table aclTable, String userName, String namespace,
UserPermission userPermission) throws IOException {
// remove user ns acls but reserve table acls
if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
Set<TableName> skipTables = new HashSet<>();
List<byte[]> userEntries = HDFSAclStorage.getUserEntries(aclTable, userName);
List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
for (byte[] entry : userEntries) {
if (!PermissionStorage.isNamespaceEntry(entry) && !PermissionStorage.isGlobalEntry(entry)) {
skipTables.add(TableName.valueOf(entry));
}
}
hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
}
HDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
}

private void revokeUserTablePermission(Table aclTable, String userName, TableName tableName,
UserPermission userPermission) throws IOException {
if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName) && !HDFSAclStorage
.hasUserNamespaceHdfsAcl(aclTable, userName, tableName.getNamespaceAsString())) {
if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
&& !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
tableName.getNamespaceAsString())) {
// remove table acls
hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
}
HDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
}

private boolean containReadPermission(UserPermission userPermission) {
Expand Down Expand Up @@ -461,11 +472,11 @@ private UserPermission getUserTablePermission(Configuration conf, String userNam
}

private boolean isHdfsAclEnabled(Configuration configuration) {
return configuration.getBoolean(HDFSAclHelper.HDFS_ACL_ENABLE, false);
return configuration.getBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, false);
}

protected static final class HDFSAclStorage {
public static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
protected static final class SnapshotScannerHDFSAclStorage {
static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");

static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
import static org.apache.hadoop.fs.permission.AclEntryType.USER;
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
Expand All @@ -64,35 +66,38 @@
* A helper to set HBase granted user access acl and default acl over hFiles.
*/
@InterfaceAudience.Private
public class HDFSAclHelper implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HDFSAclHelper.class);
public class SnapshotScannerHDFSAclHelper implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class);

public static final String HDFS_ACL_ENABLE = "hbase.hdfs.acl.enable";
public static final String HDFS_ACL_THREAD_NUMBER = "hbase.hdfs.acl.thread.number";
public static final String USER_SCAN_SNAPSHOT_ENABLE = "hbase.user.scan.snapshot.enable";
public static final String USER_SCAN_SNAPSHOT_THREAD_NUMBER =
"hbase.user.scan.snapshot.thread.number";
// the tmp directory to restore snapshot, it can not be a sub directory of HBase root dir
public static final String SNAPSHOT_RESTORE_TMP_DIR = "hbase.snapshot.restore.tmp.dir";
public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT =
"/hbase/.tmpdir-to-restore-snapshot";
// If enable this feature, set public directories permission to 751
public static final FsPermission ACL_ENABLE_PUBLIC_HFILE_PERMISSION =
new FsPermission((short) 0751);
public static final String COMMON_DIRECTORY_PERMISSION =
"hbase.user.scan.snapshot.common.directory.permission";
public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751";
// If enable this feature, set restore directory permission to 703
public static final FsPermission ACL_ENABLE_RESTORE_HFILE_PERMISSION =
new FsPermission((short) 0703);
public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION =
"hbase.user.scan.snapshot.restore.directory.permission";
public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = "703";

private Admin admin;
private final Configuration conf;
private FileSystem fs;
private PathHelper pathHelper;
private ExecutorService pool;

public HDFSAclHelper(Configuration configuration, Connection connection)
public SnapshotScannerHDFSAclHelper(Configuration configuration, Connection connection)
throws IOException {
this.conf = configuration;
this.pathHelper = new PathHelper(conf);
this.fs = pathHelper.getFileSystem();
this.pathHelper = new PathHelper(conf);
this.pool = Executors.newFixedThreadPool(conf.getInt(HDFS_ACL_THREAD_NUMBER, 10),
this.pool = Executors.newFixedThreadPool(conf.getInt(USER_SCAN_SNAPSHOT_THREAD_NUMBER, 10),
new ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build());
if (connection == null) {
connection = ConnectionFactory.createConnection(conf);
Expand Down Expand Up @@ -122,14 +127,16 @@ public void setCommonDirPermission() throws IOException {
if (!fs.exists(path)) {
fs.mkdirs(path);
}
fs.setPermission(path, ACL_ENABLE_PUBLIC_HFILE_PERMISSION);
fs.setPermission(path, new FsPermission(
conf.get(COMMON_DIRECTORY_PERMISSION, COMMON_DIRECTORY_PERMISSION_DEFAULT)));
}
// create snapshot restore directory
Path restoreDir =
new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, SNAPSHOT_RESTORE_TMP_DIR_DEFAULT));
if (!fs.exists(restoreDir)) {
fs.mkdirs(restoreDir);
fs.setPermission(restoreDir, ACL_ENABLE_RESTORE_HFILE_PERMISSION);
fs.setPermission(restoreDir, new FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
}
}

Expand Down Expand Up @@ -235,10 +242,9 @@ public boolean resetTableAcl(TableName tableName) {
public boolean removeNamespaceAcl(TableName tableName, Set<String> removeUsers) {
try {
long start = System.currentTimeMillis();
List<AclEntry> aclEntries = removeUsers.stream()
.map(removeUser -> new AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(USER)
.setName(removeUser).setPermission(FsAction.READ_EXECUTE).build())
.collect(Collectors.toList());
List<AclEntry> aclEntries =
removeUsers.stream().map(removeUser -> aclEntry(ACCESS, removeUser, READ_EXECUTE))
.collect(Collectors.toList());
Path nsPath = pathHelper.getDataNsDir(tableName.getNamespaceAsString());
fs.removeAclEntries(nsPath, aclEntries);
LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
Expand All @@ -259,11 +265,9 @@ public void addTableAcl(TableName tableName, String user) {
try {
long start = System.currentTimeMillis();
List<AclEntry> aclEntries = new ArrayList<>(2);
AclEntry accessAclEntry = new AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(USER)
.setName(user).setPermission(FsAction.READ_EXECUTE).build();
AclEntry accessAclEntry = aclEntry(ACCESS, user, READ_EXECUTE);
aclEntries.add(accessAclEntry);
aclEntries.add(new AclEntry.Builder().setScope(AclEntryScope.DEFAULT).setType(USER)
.setName(user).setPermission(FsAction.READ_EXECUTE).build());
aclEntries.add(aclEntry(DEFAULT, user, READ_EXECUTE));
// set access and default HDFS acl for table dir
fs.modifyAclEntries(pathHelper.getTmpTableDir(tableName), aclEntries);
fs.modifyAclEntries(pathHelper.getArchiveTableDir(tableName), aclEntries);
Expand Down Expand Up @@ -540,6 +544,12 @@ private CompletableFuture<Void> setHDFSAclParallel(List<HDFSAclOperation> operat
return future;
}

private static AclEntry aclEntry(AclEntryScope scope, String name, FsAction action) {
return new AclEntry.Builder().setScope(scope)
.setType(AuthUtil.isGroupPrincipal(name) ? GROUP : USER).setName(name).setPermission(action)
.build();
}

/**
* Inner class used to describe modify or remove what acl entries for files or directories(and
* child files)
Expand Down Expand Up @@ -629,11 +639,6 @@ private List<AclEntry> getDefaultAclEntries(Set<String> users, FsAction action)
}
return dirAclList;
}

private AclEntry aclEntry(AclEntryScope scope, String name, FsAction action) {
return new AclEntry.Builder().setScope(scope).setType(USER).setName(name)
.setPermission(action).build();
}
}

protected static final class PathHelper {
Expand Down
Loading

0 comments on commit eac82c0

Please sign in to comment.