From 75b2b043a1c9b67ef2e9466a5646252a9d6b6dcb Mon Sep 17 00:00:00 2001 From: Symious Date: Fri, 3 Sep 2021 17:11:58 +0800 Subject: [PATCH 1/3] HDFS-16210. Add the option of refreshCallQueue to RouterAdmin --- .../federation/router/RouterAdminServer.java | 22 ++++++++++- .../hdfs/tools/federation/RouterAdmin.java | 37 +++++++++++++++++++ .../federation/router/TestRouterAdminCLI.java | 9 +++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index b44142fb17055..39ff20b58113c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -81,11 +81,15 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos; +import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; @@ -102,7 +106,7 @@ * router. It is created, started, and stopped by {@link Router}. */ public class RouterAdminServer extends AbstractService - implements RouterAdminProtocol { + implements RouterAdminProtocol, RefreshCallQueueProtocol { private static final Logger LOG = LoggerFactory.getLogger(RouterAdminServer.class); @@ -197,8 +201,16 @@ public RouterAdminServer(Configuration conf, Router router) GenericRefreshProtocolProtos.GenericRefreshProtocolService. newReflectiveBlockingService(genericRefreshXlator); + RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = + new RefreshCallQueueProtocolServerSideTranslatorPB(this); + BlockingService refreshCallQueueService = + RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService. + newReflectiveBlockingService(refreshCallQueueXlator); + DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, adminServer); + DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, + refreshCallQueueService, adminServer); } /** @@ -764,4 +776,12 @@ public boolean refreshSuperUserGroupsConfiguration() throws IOException { ProxyUsers.refreshSuperUserGroupsConfiguration(); return true; } + + @Override // RefreshCallQueueProtocol + public void refreshCallQueue() throws IOException { + LOG.info("Refreshing call queue."); + + Configuration conf = new Configuration(); + router.getRpcServer().getServer().refreshCallQueue(conf); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index 7422989d6aad2..8e860356e9e6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -77,6 +77,8 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -388,6 +390,8 @@ public int run(String[] argv) throws Exception { exitCode = genericRefresh(argv, i); } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) { exitCode = refreshSuperUserGroupsConfiguration(); + } else if ("-refreshCallQueue".equals(cmd)) { + exitCode = refreshCallQueue(); } else { throw new IllegalArgumentException("Unknown Command: " + cmd); } @@ -1258,6 +1262,39 @@ public int genericRefresh(String[] argv, int i) throws IOException { } } + /** + * Refresh Router's call Queue + * + * @throws IOException if the operation was not successful. + */ + private int refreshCallQueue() throws IOException { + Configuration conf = getConf(); + String hostport = getConf().getTrimmed( + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + + // Create the client + Class xface = RefreshCallQueueProtocolPB.class; + InetSocketAddress address = NetUtils.createSocketAddr(hostport); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class); + RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB) RPC.getProxy( + xface, RPC.getProtocolVersion(xface), address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), 0); + + int returnCode = -1; + try (RefreshCallQueueProtocolClientSideTranslatorPB xlator = + new RefreshCallQueueProtocolClientSideTranslatorPB(proxy)) { + xlator.refreshCallQueue(); + System.out.println("Refresh call queue successful for " + hostport); + returnCode = 0; + } catch (IOException ioe){ + System.out.println("Refresh call queue failed for " + hostport); + } + return returnCode; + } + /** * Normalize a path for that filesystem. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index 1daff053ed5a4..4178e323c54f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -1740,6 +1740,15 @@ public void testErrorFaultTolerant() throws Exception { assertEquals(0, ToolRunner.run(admin, argv)); } + @Test + public void testRefreshCallQueue() throws Exception { + + System.setOut(new PrintStream(out)); + String[] argv = new String[]{"-refreshCallQueue"}; + assertEquals(0, ToolRunner.run(admin, argv)); + assertTrue(out.toString().contains("Refresh call queue successful")); + } + private void addMountTable(String src, String nsId, String dst) throws Exception { String[] argv = new String[] {"-add", src, nsId, dst}; From de4f45fa8fc6d9b4486654cb2bba0a2afb0624a1 Mon Sep 17 00:00:00 2001 From: Symious Date: Fri, 3 Sep 2021 20:59:25 +0800 Subject: [PATCH 2/3] HDFS-16210. Fix checkstyle --- .../federation/router/RouterAdminServer.java | 4 ++-- .../hadoop/hdfs/tools/federation/RouterAdmin.java | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index 39ff20b58113c..d2b20bc4e8d23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -781,7 +781,7 @@ public boolean refreshSuperUserGroupsConfiguration() throws IOException { public void refreshCallQueue() throws IOException { LOG.info("Refreshing call queue."); - Configuration conf = new Configuration(); - router.getRpcServer().getServer().refreshCallQueue(conf); + Configuration configuration = new Configuration(); + router.getRpcServer().getServer().refreshCallQueue(configuration); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index 8e860356e9e6e..3018053901e7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -1263,15 +1263,15 @@ public int genericRefresh(String[] argv, int i) throws IOException { } /** - * Refresh Router's call Queue + * Refresh Router's call Queue. * * @throws IOException if the operation was not successful. */ private int refreshCallQueue() throws IOException { Configuration conf = getConf(); String hostport = getConf().getTrimmed( - RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, - RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); // Create the client Class xface = RefreshCallQueueProtocolPB.class; @@ -1279,13 +1279,13 @@ private int refreshCallQueue() throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class); - RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB) RPC.getProxy( - xface, RPC.getProtocolVersion(xface), address, ugi, conf, - NetUtils.getDefaultSocketFactory(conf), 0); + RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)RPC.getProxy( + xface, RPC.getProtocolVersion(xface), address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), 0); int returnCode = -1; try (RefreshCallQueueProtocolClientSideTranslatorPB xlator = - new RefreshCallQueueProtocolClientSideTranslatorPB(proxy)) { + new RefreshCallQueueProtocolClientSideTranslatorPB(proxy)) { xlator.refreshCallQueue(); System.out.println("Refresh call queue successful for " + hostport); returnCode = 0; From 3ade7622250b39d226330984a02e865dd27bc2aa Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 6 Sep 2021 17:28:02 +0800 Subject: [PATCH 3/3] HDFS-16210. Change successful to successfully --- .../org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java | 4 ++-- .../hdfs/server/federation/router/TestRouterAdminCLI.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index 3018053901e7d..deadf3d3132c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -1287,10 +1287,10 @@ private int refreshCallQueue() throws IOException { try (RefreshCallQueueProtocolClientSideTranslatorPB xlator = new RefreshCallQueueProtocolClientSideTranslatorPB(proxy)) { xlator.refreshCallQueue(); - System.out.println("Refresh call queue successful for " + hostport); + System.out.println("Refresh call queue successfully for " + hostport); returnCode = 0; } catch (IOException ioe){ - System.out.println("Refresh call queue failed for " + hostport); + System.out.println("Refresh call queue unsuccessfully for " + hostport); } return returnCode; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index 4178e323c54f3..4134b49d9af09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -1746,7 +1746,7 @@ public void testRefreshCallQueue() throws Exception { System.setOut(new PrintStream(out)); String[] argv = new String[]{"-refreshCallQueue"}; assertEquals(0, ToolRunner.run(admin, argv)); - assertTrue(out.toString().contains("Refresh call queue successful")); + assertTrue(out.toString().contains("Refresh call queue successfully")); } private void addMountTable(String src, String nsId, String dst)