Skip to content

Commit

Permalink
fix bugs and add comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Nov 1, 2024
1 parent ac6e0be commit 8f4d800
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public class RefreshUserMappingsProtocolServerSideTranslatorPB implements Refres

private final RefreshUserMappingsProtocol impl;

protected final static RefreshUserToGroupsMappingsResponseProto
private final static RefreshUserToGroupsMappingsResponseProto
VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE =
RefreshUserToGroupsMappingsResponseProto.newBuilder().build();

protected final static RefreshSuperUserGroupsConfigurationResponseProto
private final static RefreshSuperUserGroupsConfigurationResponseProto
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE =
RefreshSuperUserGroupsConfigurationResponseProto.newBuilder()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public static <T, R> R asyncIpcClient(
return asyncReturn(clazz);
}

/**
* Asynchronously invokes an RPC call and applies a response transformation function
* to the result on server-side.
* @param req The IPC call encapsulating the RPC request on server-side.
* @param res The function to apply to the response of the RPC call on server-side.
* @param <T> Type of the call's result.
*/
public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) {
final ProtobufRpcEngineCallback2 callback =
ProtobufRpcEngine2.Server.registerForDeferredResponse2();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,10 @@ public RouterNamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) {
}

@Override
public GetBlocksResponseProto getBlocks(
RpcController unused,
GetBlocksRequestProto request) {
public GetBlocksResponseProto getBlocks(RpcController unused,
GetBlocksRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return getBlocks(unused, request);
return super.getBlocks(unused, request);
}
asyncRouterServer(() -> {
DatanodeInfo dnInfo = new DatanodeInfo.DatanodeInfoBuilder()
Expand All @@ -89,11 +88,10 @@ public GetBlocksResponseProto getBlocks(
}

@Override
public GetBlockKeysResponseProto getBlockKeys(
RpcController unused,
GetBlockKeysRequestProto request) {
public GetBlockKeysResponseProto getBlockKeys(RpcController unused,
GetBlockKeysRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return getBlockKeys(unused, request);
return super.getBlockKeys(unused, request);
}
asyncRouterServer(server::getBlockKeys, keys -> {
GetBlockKeysResponseProto.Builder builder =
Expand All @@ -107,11 +105,10 @@ public GetBlockKeysResponseProto getBlockKeys(
}

@Override
public GetTransactionIdResponseProto getTransactionId(
RpcController unused,
GetTransactionIdRequestProto request) {
public GetTransactionIdResponseProto getTransactionId(RpcController unused,
GetTransactionIdRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return getTransactionId(unused, request);
return super.getTransactionId(unused, request);
}
asyncRouterServer(server::getTransactionID,
txid -> GetTransactionIdResponseProto
Expand All @@ -121,9 +118,10 @@ public GetTransactionIdResponseProto getTransactionId(

@Override
public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
RpcController unused, GetMostRecentCheckpointTxIdRequestProto request) {
RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return getMostRecentCheckpointTxId(unused, request);
return super.getMostRecentCheckpointTxId(unused, request);
}
asyncRouterServer(server::getMostRecentCheckpointTxId,
txid -> GetMostRecentCheckpointTxIdResponseProto
Expand All @@ -132,11 +130,10 @@ public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
}

@Override
public RollEditLogResponseProto rollEditLog(
RpcController unused,
RollEditLogRequestProto request) {
public RollEditLogResponseProto rollEditLog(RpcController unused,
RollEditLogRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return rollEditLog(unused, request);
return super.rollEditLog(unused, request);
}
asyncRouterServer(server::rollEditLog,
signature -> RollEditLogResponseProto.newBuilder()
Expand All @@ -145,11 +142,10 @@ public RollEditLogResponseProto rollEditLog(
}

@Override
public ErrorReportResponseProto errorReport(
RpcController unused,
ErrorReportRequestProto request) {
public ErrorReportResponseProto errorReport(RpcController unused,
ErrorReportRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return errorReport(unused, request);
return super.errorReport(unused, request);
}
asyncRouterServer(() -> {
server.errorReport(PBHelper.convert(request.getRegistration()),
Expand All @@ -161,9 +157,9 @@ public ErrorReportResponseProto errorReport(

@Override
public RegisterResponseProto registerSubordinateNamenode(
RpcController unused, RegisterRequestProto request) {
RpcController unused, RegisterRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return registerSubordinateNamenode(unused, request);
return super.registerSubordinateNamenode(unused, request);
}
asyncRouterServer(() -> server.registerSubordinateNamenode(
PBHelper.convert(request.getRegistration())),
Expand All @@ -173,11 +169,10 @@ public RegisterResponseProto registerSubordinateNamenode(
}

@Override
public StartCheckpointResponseProto startCheckpoint(
RpcController unused,
StartCheckpointRequestProto request) {
public StartCheckpointResponseProto startCheckpoint(RpcController unused,
StartCheckpointRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return startCheckpoint(unused, request);
return super.startCheckpoint(unused, request);
}
asyncRouterServer(() ->
server.startCheckpoint(PBHelper.convert(request.getRegistration())),
Expand All @@ -188,11 +183,10 @@ public StartCheckpointResponseProto startCheckpoint(


@Override
public EndCheckpointResponseProto endCheckpoint(
RpcController unused,
EndCheckpointRequestProto request) {
public EndCheckpointResponseProto endCheckpoint(RpcController unused,
EndCheckpointRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return endCheckpoint(unused, request);
return super.endCheckpoint(unused, request);
}
asyncRouterServer(() -> {
server.endCheckpoint(PBHelper.convert(request.getRegistration()),
Expand All @@ -204,9 +198,9 @@ public EndCheckpointResponseProto endCheckpoint(

@Override
public GetEditLogManifestResponseProto getEditLogManifest(
RpcController unused, GetEditLogManifestRequestProto request) {
RpcController unused, GetEditLogManifestRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return getEditLogManifest(unused, request);
return super.getEditLogManifest(unused, request);
}
asyncRouterServer(() -> server.getEditLogManifest(request.getSinceTxId()),
manifest -> GetEditLogManifestResponseProto.newBuilder()
Expand All @@ -217,9 +211,9 @@ public GetEditLogManifestResponseProto getEditLogManifest(
@Override
public VersionResponseProto versionRequest(
RpcController controller,
VersionRequestProto request) {
VersionRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return versionRequest(controller, request);
return super.versionRequest(controller, request);
}
asyncRouterServer(server::versionRequest,
info -> VersionResponseProto.newBuilder()
Expand All @@ -228,10 +222,10 @@ public VersionResponseProto versionRequest(
}

@Override
public IsUpgradeFinalizedResponseProto isUpgradeFinalized(
RpcController controller, IsUpgradeFinalizedRequestProto request) {
public IsUpgradeFinalizedResponseProto isUpgradeFinalized(RpcController controller,
IsUpgradeFinalizedRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return isUpgradeFinalized(controller, request);
return super.isUpgradeFinalized(controller, request);
}
asyncRouterServer(server::isUpgradeFinalized,
isUpgradeFinalized -> IsUpgradeFinalizedResponseProto.newBuilder()
Expand All @@ -244,7 +238,7 @@ public IsRollingUpgradeResponseProto isRollingUpgrade(
RpcController controller, IsRollingUpgradeRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return isRollingUpgrade(controller, request);
return super.isRollingUpgrade(controller, request);
}
asyncRouterServer(server::isRollingUpgrade,
isRollingUpgrade -> IsRollingUpgradeResponseProto.newBuilder()
Expand All @@ -254,9 +248,10 @@ public IsRollingUpgradeResponseProto isRollingUpgrade(

@Override
public GetNextSPSPathResponseProto getNextSPSPath(
RpcController controller, GetNextSPSPathRequestProto request) {
RpcController controller, GetNextSPSPathRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return getNextSPSPath(controller, request);
return super.getNextSPSPath(controller, request);
}
asyncRouterServer(server::getNextSPSPath,
nextSPSPath -> GetNextSPSPathResponseProto.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationResponseProto;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto;
Expand All @@ -32,9 +31,18 @@

public class RouterRefreshUserMappingsProtocolServerSideTranslatorPB
extends RefreshUserMappingsProtocolServerSideTranslatorPB {

Check failure on line 34 in hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java#L34

blanks: end of line
private final RouterRpcServer server;
private final boolean isAsyncRpc;

private final static RefreshUserToGroupsMappingsResponseProto
VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE =
RefreshUserToGroupsMappingsResponseProto.newBuilder().build();

private final static RefreshSuperUserGroupsConfigurationResponseProto
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE =
RefreshSuperUserGroupsConfigurationResponseProto.newBuilder()
.build();

public RouterRefreshUserMappingsProtocolServerSideTranslatorPB(
RefreshUserMappingsProtocol impl) {
Expand Down

0 comments on commit 8f4d800

Please sign in to comment.