Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: scale-out namenodes for nnbench #276

Merged
merged 4 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {
this(nameNodeUri, conf, null);
}

public DFSClient createDfsClient(URI nameNodeUri, Configuration conf) throws IOException {
return new DFSClient(nameNodeUri, conf, null);
}

/**
* Same as this(nameNodeUri, null, conf, stats);
* @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ public static void setParent(final long id, final long parent) {
}


public static void setParents(final long parent) {
public static void setParents(final long oldparent, final long newparent) {
try {
DatabaseConnection obj = Database.getInstance().getConnection();
String env = System.getenv("DATABASE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ public void waitUntilInstalled() throws InterruptedException {
}
}

public String[] getNNUrls() {
HashSet<String> urls = new HashSet<>();
ImmutableList<MountEntry> entries = this.mounts;
for (MountEntry entry : entries) {
urls.add(entry.fsUri);
}
return urls.toArray(new String[urls.size()]);
}

public void dump() {
ImmutableList<MountEntry> entries = this.mounts;
StringBuilder result = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private int quotaInitThreads;

private final int inodeXAttrsLimit; //inode xattrs max limit
private boolean localNN = true;

// A set of directories that have been protected using the
// dfs.namenode.protected.directories setting. These directories cannot
Expand Down Expand Up @@ -212,7 +213,7 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final String supergroup;
private final INodeId inodeId;

private final MountsManager mountsManager;
private MountsManager mountsManager = null;
private final FSEditLog editLog;

private HdfsFileStatus[] reservedStatuses;
Expand Down Expand Up @@ -411,10 +412,24 @@ public enum DirOp {

initUsersToBypassExtProvider(conf);

// initialize a mount manager
mountsManager = new MountsManager();
mountsManager.init(new HdfsConfiguration());
mountsManager.start();
String enableNNProxy = System.getenv("ENABLE_NN_PROXY");
if (enableNNProxy != null) {
if (Boolean.parseBoolean(enableNNProxy)) {
String NNProxyQuorum = System.getenv("NNPROXY_ZK_QUORUM");
String NNProxyMountTablePath = System.getenv("NNPROXY_MOUNT_TABLE_ZKPATH");
if (NNProxyQuorum != null && NNProxyMountTablePath != null) {
// initialize a mount manager
mountsManager = new MountsManager();
mountsManager.init(new HdfsConfiguration());
mountsManager.start();
localNN = false;
}
}
}
}

public boolean isLocalNN() {
return localNN;
}

private void initUsersToBypassExtProvider(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ void writeFields(DataOutputStream out) throws IOException {
// FSImageSerialization.writeString(path, out);
FSImageSerialization.writeLong(timestamp, out); // mtime
FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
permissions.write(out);
// permissions.write(out);
AclEditLogUtil.write(aclEntries, out);
XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,29 +781,17 @@ public boolean addChild(
node.setLocalName(DFSUtil.string2Bytes(name));

// get mount point from zookeeper
boolean local = true;
String[] address = new String[2];
String enableNNProxy = System.getenv("ENABLE_NN_PROXY");
if (enableNNProxy != null) {
if (Boolean.parseBoolean(enableNNProxy)) {
String NNProxyQuorum = System.getenv("NNPROXY_ZK_QUORUM");
String NNProxyMountTablePath = System.getenv("NNPROXY_MOUNT_TABLE_ZKPATH");
if (NNProxyQuorum != null && NNProxyMountTablePath != null) {
local = false;
try {
// LOG.info(existingPath + " : " + mpoint);
String mpoint = FSDirectory.getInstance().getMountsManager().resolve(existingPath);
address = mpoint.replace("hdfs://","").split(":");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

if (local) {
if (FSDirectory.getInstance().isLocalNN()) {
localRename(node);
} else {
String[] address = new String[2];
try {
// LOG.info(existingPath + " : " + mpoint);
String mpoint = FSDirectory.getInstance().getMountsManager().resolve(existingPath);
address = mpoint.replace("hdfs://","").split(":");
} catch (Exception e) {
e.printStackTrace();
}
remoteRename(node, address[0]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int getLength() {
* #clonePermissionStatus(INodeWithAdditionalFields)} and {@link
* #updatePermissionStatus(PermissionStatusFormat, long)} should not modify it.
*/
private long permission = 0L;
private long permission = -1L;

private long modificationTime = -1L;
private long accessTime = -1L;
Expand Down Expand Up @@ -390,7 +390,7 @@ void setPermission(FsPermission permission) {

@Override
public long getPermissionLong() {
if (permission == 0L) {
if (permission == -1L) {
permission = DatabaseINode.getPermission(getId());
}
return permission;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -88,6 +89,7 @@
import org.apache.log4j.LogManager;

import org.apache.hadoop.hdfs.db.*;
import org.apache.hadoop.hdfs.nnproxy.server.mount.MountsManager;

/**
* Main class for a series of name-node benchmarks.
Expand All @@ -111,10 +113,14 @@ public class NNThroughputBenchmark implements Tool {
private static final String GENERAL_OPTIONS_USAGE =
"[-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]";

private static MountsManager mountsManager;
boolean local = true;

static Configuration config;
static NameNode nameNode;
static NamenodeProtocol nameNodeProto;
static ClientProtocol clientProto;
static HashMap<String, ClientProtocol> nnProtos;
static DatanodeProtocol dataNodeProto;
static RefreshUserMappingsProtocol refreshUserMappingsProto;
static String bpid = null;
Expand All @@ -140,6 +146,22 @@ public class NNThroughputBenchmark implements Tool {
config.set(DFSConfigKeys.DFS_HOSTS, "${hadoop.tmp.dir}/dfs/hosts/include");
File includeFile = new File(config.get(DFSConfigKeys.DFS_HOSTS, "include"));
new FileOutputStream(includeFile).close();

String enableNNProxy = System.getenv("ENABLE_NN_PROXY");
if (enableNNProxy != null) {
if (Boolean.parseBoolean(enableNNProxy)) {
String NNProxyQuorum = System.getenv("NNPROXY_ZK_QUORUM");
String NNProxyMountTablePath = System.getenv("NNPROXY_MOUNT_TABLE_ZKPATH");
if (NNProxyQuorum != null && NNProxyMountTablePath != null) {
// initialize a mount manager
mountsManager = new MountsManager();
mountsManager.init(new HdfsConfiguration());
mountsManager.start();
local = false;
nnProtos = new HashMap<String, ClientProtocol>();
}
}
}
}

void close() {
Expand Down Expand Up @@ -607,16 +629,23 @@ String getExecutionArgument(int daemonId) {
@Override
long executeOp(int daemonId, int inputIdx, String clientName)
throws IOException {
ClientProtocol cp = null;
if (local) {
cp = clientProto;
} else {
cp = nnProtos.get(mountsManager.resolve(fileNames[daemonId][inputIdx]));
}

long start = Time.now();
// dummyActionNoSynch(fileIdx);
clientProto.create(fileNames[daemonId][inputIdx],
cp.create(fileNames[daemonId][inputIdx],
FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null);
long end = Time.now();
for (boolean written = !closeUponCreate; !written;
written = clientProto.complete(fileNames[daemonId][inputIdx],
written = cp.complete(fileNames[daemonId][inputIdx],
clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)) {
};
return end-start;
Expand Down Expand Up @@ -794,8 +823,16 @@ long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
String fname = fileNames[daemonId][inputIdx];
fname = fname.replace("open", "create");

ClientProtocol cp = null;
if (local) {
cp = clientProto;
} else {
cp = nnProtos.get(mountsManager.resolve(fname));
}

long start = Time.now();
clientProto.getBlockLocations(fname, 0L, BLOCK_SIZE);
cp.getBlockLocations(fname, 0L, BLOCK_SIZE);
long end = Time.now();
return end-start;
}
Expand Down Expand Up @@ -1617,6 +1654,18 @@ public int run(String[] aArgs) throws Exception {
refreshUserMappingsProto =
DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnRealUri);
getBlockPoolId(dfs);

// init multiple client protos according to the mount table
String[] nnUrls = null;
if (!local) {
nnUrls = mountsManager.getNNUrls();
for (String url : nnUrls) {
URI nameNodeUri = URI.create(url);
ClientProtocol cp = dfs.getClient().createDfsClient(nameNodeUri, getConf()).getNamenode();
cp.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false);
nnProtos.put(url, cp);
}
}
}
// run each benchmark
long beforeUsedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
Expand Down