diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java index d04a61e686285..ffab0f1c4876d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java @@ -20,16 +20,21 @@ import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext; import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine2; +import org.apache.hadoop.ipc.ProtobufRpcEngineCallback2; import org.apache.hadoop.ipc.internal.ShadedProtobufHelper; +import org.apache.hadoop.thirdparty.protobuf.Message; import org.apache.hadoop.util.concurrent.AsyncGet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; @@ -96,6 +101,45 @@ public static R asyncIpcClient( return asyncReturn(clazz); } + /** + * Asynchronously invokes an RPC call and applies a response transformation function + * to the result on server-side. + * @param req The IPC call encapsulating the RPC request on server-side. + * @param res The function to apply to the response of the RPC call on server-side. + * @param Type of the call's result. + */ + public static void asyncRouterServer(ServerReq req, ServerRes res) { + final ProtobufRpcEngineCallback2 callback = + ProtobufRpcEngine2.Server.registerForDeferredResponse2(); + + CompletableFuture completableFuture = + CompletableFuture.completedFuture(null); + completableFuture.thenCompose(o -> { + try { + req.req(); + return (CompletableFuture) AsyncUtil.getAsyncUtilCompletableFuture(); + } catch (Exception e) { + throw new CompletionException(e); + } + }).handle((result, e) -> { + LOG.debug("Async response, callback: {}, CallerContext: {}, result: [{}], exception: [{}]", + callback, CallerContext.getCurrent(), result, e); + if (e == null) { + Message value = null; + try { + value = res.res(result); + } catch (Exception re) { + callback.error(re); + return null; + } + callback.setResponse(value); + } else { + callback.error(e.getCause()); + } + return null; + }); + } + /** * Sets the executor used for handling responses asynchronously within * the utility class. @@ -105,4 +149,14 @@ public static R asyncIpcClient( public static void setWorker(Executor worker) { AsyncRpcProtocolPBUtil.worker = worker; } + + @FunctionalInterface + interface ServerReq { + T req() throws Exception; + } + + @FunctionalInterface + interface ServerRes { + Message res(T result) throws Exception; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000000..e70240681d27e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,1769 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.permission.FsCreateModes; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveDefaultAclResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.SetAclResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSnapshotResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteSnapshotResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DisallowSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DisallowSnapshotResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupStatsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLocatedFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLocatedFileInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SatisfyStoragePolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.DisableErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.DisableErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.EnableErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.EnableErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECTopologyResultForPoliciesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.RemoveErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.RemoveErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrResponseProto; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; +import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto; +import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; +import org.apache.hadoop.thirdparty.protobuf.ByteString; +import org.apache.hadoop.thirdparty.protobuf.ProtocolStringList; +import org.apache.hadoop.thirdparty.protobuf.RpcController; +import org.apache.hadoop.thirdparty.protobuf.ServiceException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer; + +public class RouterClientNamenodeProtocolServerSideTranslatorPB + extends ClientNamenodeProtocolServerSideTranslatorPB { + + private final RouterRpcServer server; + + public RouterClientNamenodeProtocolServerSideTranslatorPB( + ClientProtocol server) throws IOException { + super(server); + this.server = (RouterRpcServer) server; + } + + @Override + public GetBlockLocationsResponseProto getBlockLocations( + RpcController controller, GetBlockLocationsRequestProto req) { + asyncRouterServer(() -> server.getBlockLocations(req.getSrc(), req.getOffset(), + req.getLength()), + b -> { + GetBlockLocationsResponseProto.Builder builder + = GetBlockLocationsResponseProto + .newBuilder(); + if (b != null) { + builder.setLocations(PBHelperClient.convert(b)).build(); + } + return builder.build(); + }); + return null; + } + + @Override + public GetServerDefaultsResponseProto getServerDefaults( + RpcController controller, GetServerDefaultsRequestProto req) { + asyncRouterServer(server::getServerDefaults, + result -> GetServerDefaultsResponseProto.newBuilder() + .setServerDefaults(PBHelperClient.convert(result)) + .build()); + return null; + } + + + @Override + public CreateResponseProto create(RpcController controller, CreateRequestProto req) { + asyncRouterServer(() -> { + FsPermission masked = req.hasUnmasked() ? + FsCreateModes.create(PBHelperClient.convert(req.getMasked()), + PBHelperClient.convert(req.getUnmasked())) : + PBHelperClient.convert(req.getMasked()); + return server.create(req.getSrc(), + masked, req.getClientName(), + PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(), + (short) req.getReplication(), req.getBlockSize(), + PBHelperClient.convertCryptoProtocolVersions( + req.getCryptoProtocolVersionList()), + req.getEcPolicyName(), req.getStoragePolicy()); + }, result -> { + if (result != null) { + return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) + .build(); + } + return VOID_CREATE_RESPONSE; + }); + return null; + } + + @Override + public AppendResponseProto append(RpcController controller, + AppendRequestProto req) { + asyncRouterServer(() -> { + EnumSetWritable flags = req.hasFlag() ? + PBHelperClient.convertCreateFlag(req.getFlag()) : + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)); + return server.append(req.getSrc(), + req.getClientName(), flags); + }, result -> { + AppendResponseProto.Builder builder = + AppendResponseProto.newBuilder(); + if (result.getLastBlock() != null) { + builder.setBlock(PBHelperClient.convertLocatedBlock( + result.getLastBlock())); + } + if (result.getFileStatus() != null) { + builder.setStat(PBHelperClient.convert(result.getFileStatus())); + } + return builder.build(); + }); + return null; + } + + @Override + public SetReplicationResponseProto setReplication( + RpcController controller, + SetReplicationRequestProto req) { + asyncRouterServer(() -> + server.setReplication(req.getSrc(), (short) req.getReplication()), + result -> SetReplicationResponseProto.newBuilder().setResult(result).build()); + return null; + } + + + @Override + public SetPermissionResponseProto setPermission( + RpcController controller, + SetPermissionRequestProto req) { + asyncRouterServer(() -> { + server.setPermission(req.getSrc(), PBHelperClient.convert(req.getPermission())); + return null; + }, result -> VOID_SET_PERM_RESPONSE); + return null; + } + + @Override + public SetOwnerResponseProto setOwner( + RpcController controller, + SetOwnerRequestProto req) { + asyncRouterServer(() -> { + server.setOwner(req.getSrc(), + req.hasUsername() ? req.getUsername() : null, + req.hasGroupname() ? req.getGroupname() : null); + return null; + }, result -> VOID_SET_OWNER_RESPONSE); + return null; + } + + @Override + public AbandonBlockResponseProto abandonBlock( + RpcController controller, + AbandonBlockRequestProto req) { + asyncRouterServer(() -> { + server.abandonBlock(PBHelperClient.convert(req.getB()), req.getFileId(), + req.getSrc(), req.getHolder()); + return null; + }, result -> VOID_ADD_BLOCK_RESPONSE); + return null; + } + + @Override + public AddBlockResponseProto addBlock( + RpcController controller, + AddBlockRequestProto req) { + asyncRouterServer(() -> { + List excl = req.getExcludeNodesList(); + List favor = req.getFavoredNodesList(); + EnumSet flags = + PBHelperClient.convertAddBlockFlags(req.getFlagsList()); + return server.addBlock( + req.getSrc(), + req.getClientName(), + req.hasPrevious() ? PBHelperClient.convert(req.getPrevious()) : null, + (excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl + .toArray(new HdfsProtos.DatanodeInfoProto[excl.size()])), req.getFileId(), + (favor == null || favor.size() == 0) ? null : favor + .toArray(new String[favor.size()]), + flags); + }, result -> AddBlockResponseProto.newBuilder() + .setBlock(PBHelperClient.convertLocatedBlock(result)).build()); + return null; + } + + @Override + public GetAdditionalDatanodeResponseProto getAdditionalDatanode( + RpcController controller, GetAdditionalDatanodeRequestProto req) { + asyncRouterServer(() -> { + List existingList = req.getExistingsList(); + List existingStorageIDsList = req.getExistingStorageUuidsList(); + List excludesList = req.getExcludesList(); + LocatedBlock result = server.getAdditionalDatanode(req.getSrc(), + req.getFileId(), PBHelperClient.convert(req.getBlk()), + PBHelperClient.convert(existingList.toArray( + new HdfsProtos.DatanodeInfoProto[existingList.size()])), + existingStorageIDsList.toArray( + new String[existingStorageIDsList.size()]), + PBHelperClient.convert(excludesList.toArray( + new HdfsProtos.DatanodeInfoProto[excludesList.size()])), + req.getNumAdditionalNodes(), req.getClientName()); + return result; + }, result -> GetAdditionalDatanodeResponseProto.newBuilder() + .setBlock( + PBHelperClient.convertLocatedBlock(result)) + .build()); + return null; + } + + @Override + public CompleteResponseProto complete( + RpcController controller, + CompleteRequestProto req) { + asyncRouterServer(() -> { + boolean result = + server.complete(req.getSrc(), req.getClientName(), + req.hasLast() ? PBHelperClient.convert(req.getLast()) : null, + req.hasFileId() ? req.getFileId() : HdfsConstants.GRANDFATHER_INODE_ID); + return result; + }, result -> CompleteResponseProto.newBuilder().setResult(result).build()); + return null; + } + + @Override + public ReportBadBlocksResponseProto reportBadBlocks( + RpcController controller, + ReportBadBlocksRequestProto req) { + asyncRouterServer(() -> { + List bl = req.getBlocksList(); + server.reportBadBlocks(PBHelperClient.convertLocatedBlocks( + bl.toArray(new HdfsProtos.LocatedBlockProto[bl.size()]))); + return null; + }, result -> VOID_REP_BAD_BLOCK_RESPONSE); + return null; + } + + @Override + public ConcatResponseProto concat( + RpcController controller, + ConcatRequestProto req) { + asyncRouterServer(() -> { + List srcs = req.getSrcsList(); + server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()])); + return null; + }, result -> VOID_CONCAT_RESPONSE); + return null; + } + + @Override + public RenameResponseProto rename( + RpcController controller, + RenameRequestProto req) { + asyncRouterServer(() -> { + return server.rename(req.getSrc(), req.getDst()); + }, result -> RenameResponseProto.newBuilder().setResult(result).build()); + return null; + } + + @Override + public Rename2ResponseProto rename2( + RpcController controller, + Rename2RequestProto req) { + asyncRouterServer(() -> { + // resolve rename options + ArrayList optionList = new ArrayList(); + if (req.getOverwriteDest()) { + optionList.add(Options.Rename.OVERWRITE); + } + if (req.hasMoveToTrash() && req.getMoveToTrash()) { + optionList.add(Options.Rename.TO_TRASH); + } + + if (optionList.isEmpty()) { + optionList.add(Options.Rename.NONE); + } + server.rename2(req.getSrc(), req.getDst(), + optionList.toArray(new Options.Rename[optionList.size()])); + return null; + }, result -> VOID_RENAME2_RESPONSE); + return null; + } + + @Override + public TruncateResponseProto truncate( + RpcController controller, + TruncateRequestProto req) { + asyncRouterServer(() -> server.truncate(req.getSrc(), req.getNewLength(), + req.getClientName()), + result -> TruncateResponseProto.newBuilder().setResult(result).build()); + return null; + } + + @Override + public DeleteResponseProto delete( + RpcController controller, + DeleteRequestProto req) { + asyncRouterServer(() -> server.delete(req.getSrc(), req.getRecursive()), + result -> DeleteResponseProto.newBuilder().setResult(result).build()); + return null; + } + + @Override + public MkdirsResponseProto mkdirs( + RpcController controller, + MkdirsRequestProto req) { + asyncRouterServer(() -> { + FsPermission masked = req.hasUnmasked() ? + FsCreateModes.create(PBHelperClient.convert(req.getMasked()), + PBHelperClient.convert(req.getUnmasked())) : + PBHelperClient.convert(req.getMasked()); + boolean result = server.mkdirs(req.getSrc(), masked, + req.getCreateParent()); + return result; + }, result -> MkdirsResponseProto.newBuilder().setResult(result).build()); + return null; + } + + @Override + public GetListingResponseProto getListing( + RpcController controller, + GetListingRequestProto req) { + asyncRouterServer(() -> { + DirectoryListing result = server.getListing( + req.getSrc(), req.getStartAfter().toByteArray(), + req.getNeedLocation()); + return result; + }, result -> { + if (result != null) { + return GetListingResponseProto.newBuilder().setDirList( + PBHelperClient.convert(result)).build(); + } else { + return VOID_GETLISTING_RESPONSE; + } + }); + return null; + } + + @Override + public GetBatchedListingResponseProto getBatchedListing( + RpcController controller, + GetBatchedListingRequestProto request) { + asyncRouterServer(() -> { + BatchedDirectoryListing result = server.getBatchedListing( + request.getPathsList().toArray(new String[]{}), + request.getStartAfter().toByteArray(), + request.getNeedLocation()); + return result; + }, result -> { + if (result != null) { + GetBatchedListingResponseProto.Builder builder = + GetBatchedListingResponseProto.newBuilder(); + for (HdfsPartialListing partialListing : result.getListings()) { + HdfsProtos.BatchedDirectoryListingProto.Builder listingBuilder = + HdfsProtos.BatchedDirectoryListingProto.newBuilder(); + if (partialListing.getException() != null) { + RemoteException ex = partialListing.getException(); + HdfsProtos.RemoteExceptionProto.Builder rexBuilder = + HdfsProtos.RemoteExceptionProto.newBuilder(); + rexBuilder.setClassName(ex.getClassName()); + if (ex.getMessage() != null) { + rexBuilder.setMessage(ex.getMessage()); + } + listingBuilder.setException(rexBuilder.build()); + } else { + for (HdfsFileStatus f : partialListing.getPartialListing()) { + listingBuilder.addPartialListing(PBHelperClient.convert(f)); + } + } + listingBuilder.setParentIdx(partialListing.getParentIdx()); + builder.addListings(listingBuilder); + } + builder.setHasMore(result.hasMore()); + builder.setStartAfter(ByteString.copyFrom(result.getStartAfter())); + return builder.build(); + } else { + return VOID_GETBATCHEDLISTING_RESPONSE; + } + }); + return null; + } + + @Override + public RenewLeaseResponseProto renewLease( + RpcController controller, + RenewLeaseRequestProto req) { + asyncRouterServer(() -> { + server.renewLease(req.getClientName(), req.getNamespacesList()); + return null; + }, result -> VOID_RENEWLEASE_RESPONSE); + return null; + } + + @Override + public RecoverLeaseResponseProto recoverLease( + RpcController controller, + RecoverLeaseRequestProto req) { + asyncRouterServer(() -> server.recoverLease(req.getSrc(), req.getClientName()), + result -> RecoverLeaseResponseProto.newBuilder() + .setResult(result).build()); + return null; + } + + @Override + public RestoreFailedStorageResponseProto restoreFailedStorage( + RpcController controller, RestoreFailedStorageRequestProto req) { + asyncRouterServer(() -> server.restoreFailedStorage(req.getArg()), + result -> RestoreFailedStorageResponseProto.newBuilder().setResult(result) + .build()); + return null; + } + + @Override + public GetFsStatsResponseProto getFsStats( + RpcController controller, + GetFsStatusRequestProto req) { + asyncRouterServer(server::getStats, PBHelperClient::convert); + return null; + } + + @Override + public GetFsReplicatedBlockStatsResponseProto getFsReplicatedBlockStats( + RpcController controller, GetFsReplicatedBlockStatsRequestProto request) { + asyncRouterServer(server::getReplicatedBlockStats, PBHelperClient::convert); + return null; + } + + @Override + public GetFsECBlockGroupStatsResponseProto getFsECBlockGroupStats( + RpcController controller, GetFsECBlockGroupStatsRequestProto request) { + asyncRouterServer(server::getECBlockGroupStats, PBHelperClient::convert); + return null; + } + + @Override + public GetDatanodeReportResponseProto getDatanodeReport( + RpcController controller, GetDatanodeReportRequestProto req) { + asyncRouterServer(() -> server.getDatanodeReport(PBHelperClient.convert(req.getType())), + result -> { + List re = PBHelperClient.convert(result); + return GetDatanodeReportResponseProto.newBuilder() + .addAllDi(re).build(); + }); + return null; + } + + @Override + public GetDatanodeStorageReportResponseProto getDatanodeStorageReport( + RpcController controller, GetDatanodeStorageReportRequestProto req) { + asyncRouterServer(() -> server.getDatanodeStorageReport(PBHelperClient.convert(req.getType())), + result -> { + List reports = + PBHelperClient.convertDatanodeStorageReports(result); + return GetDatanodeStorageReportResponseProto.newBuilder() + .addAllDatanodeStorageReports(reports) + .build(); + }); + return null; + } + + @Override + public GetPreferredBlockSizeResponseProto getPreferredBlockSize( + RpcController controller, GetPreferredBlockSizeRequestProto req) { + asyncRouterServer(() -> server.getPreferredBlockSize(req.getFilename()), + result -> GetPreferredBlockSizeResponseProto.newBuilder().setBsize(result) + .build()); + return null; + } + + @Override + public SetSafeModeResponseProto setSafeMode( + RpcController controller, + SetSafeModeRequestProto req) { + asyncRouterServer(() -> server.setSafeMode(PBHelperClient.convert(req.getAction()), + req.getChecked()), + result -> SetSafeModeResponseProto.newBuilder().setResult(result).build()); + return null; + } + + @Override + public SaveNamespaceResponseProto saveNamespace( + RpcController controller, + SaveNamespaceRequestProto req) { + asyncRouterServer(() -> { + final long timeWindow = req.hasTimeWindow() ? req.getTimeWindow() : 0; + final long txGap = req.hasTxGap() ? req.getTxGap() : 0; + return server.saveNamespace(timeWindow, txGap); + }, result -> SaveNamespaceResponseProto.newBuilder().setSaved(result).build()); + return null; + } + + @Override + public RollEditsResponseProto rollEdits( + RpcController controller, + RollEditsRequestProto request) { + asyncRouterServer(server::rollEdits, + txid -> RollEditsResponseProto.newBuilder() + .setNewSegmentTxId(txid) + .build()); + return null; + } + + + @Override + public RefreshNodesResponseProto refreshNodes( + RpcController controller, + RefreshNodesRequestProto req) { + asyncRouterServer(() -> { + server.refreshNodes(); + return null; + }, result -> VOID_REFRESHNODES_RESPONSE); + return null; + } + + @Override + public FinalizeUpgradeResponseProto finalizeUpgrade( + RpcController controller, + FinalizeUpgradeRequestProto req) { + asyncRouterServer(() -> { + server.finalizeUpgrade(); + return null; + }, result -> VOID_REFRESHNODES_RESPONSE); + return null; + } + + @Override + public UpgradeStatusResponseProto upgradeStatus( + RpcController controller, UpgradeStatusRequestProto req) { + asyncRouterServer(server::upgradeStatus, + result -> { + UpgradeStatusResponseProto.Builder b = + UpgradeStatusResponseProto.newBuilder(); + b.setUpgradeFinalized(result); + return b.build(); + }); + return null; + } + + @Override + public RollingUpgradeResponseProto rollingUpgrade( + RpcController controller, + RollingUpgradeRequestProto req) { + asyncRouterServer(() -> + server.rollingUpgrade(PBHelperClient.convert(req.getAction())), + info -> { + final RollingUpgradeResponseProto.Builder b = + RollingUpgradeResponseProto.newBuilder(); + if (info != null) { + b.setRollingUpgradeInfo(PBHelperClient.convert(info)); + } + return b.build(); + }); + return null; + } + + @Override + public ListCorruptFileBlocksResponseProto listCorruptFileBlocks( + RpcController controller, ListCorruptFileBlocksRequestProto req) { + asyncRouterServer(() -> server.listCorruptFileBlocks( + req.getPath(), req.hasCookie() ? req.getCookie(): null), + result -> ListCorruptFileBlocksResponseProto.newBuilder() + .setCorrupt(PBHelperClient.convert(result)) + .build()); + return null; + } + + @Override + public MetaSaveResponseProto metaSave( + RpcController controller, + MetaSaveRequestProto req) { + asyncRouterServer(() -> { + server.metaSave(req.getFilename()); + return null; + }, result -> VOID_METASAVE_RESPONSE); + return null; + } + + @Override + public GetFileInfoResponseProto getFileInfo( + RpcController controller, + GetFileInfoRequestProto req) { + asyncRouterServer(() -> server.getFileInfo(req.getSrc()), + result -> { + if (result != null) { + return GetFileInfoResponseProto.newBuilder().setFs( + PBHelperClient.convert(result)).build(); + } + return VOID_GETFILEINFO_RESPONSE; + }); + return null; + } + + @Override + public GetLocatedFileInfoResponseProto getLocatedFileInfo( + RpcController controller, GetLocatedFileInfoRequestProto req) { + asyncRouterServer(() -> server.getLocatedFileInfo(req.getSrc(), + req.getNeedBlockToken()), + result -> { + if (result != null) { + return GetLocatedFileInfoResponseProto.newBuilder().setFs( + PBHelperClient.convert(result)).build(); + } + return VOID_GETLOCATEDFILEINFO_RESPONSE; + }); + return null; + } + + @Override + public GetFileLinkInfoResponseProto getFileLinkInfo( + RpcController controller, + GetFileLinkInfoRequestProto req) { + asyncRouterServer(() -> server.getFileLinkInfo(req.getSrc()), + result -> { + if (result != null) { + return GetFileLinkInfoResponseProto.newBuilder().setFs( + PBHelperClient.convert(result)).build(); + } else { + return VOID_GETFILELINKINFO_RESPONSE; + } + }); + return null; + } + + @Override + public GetContentSummaryResponseProto getContentSummary( + RpcController controller, GetContentSummaryRequestProto req) { + asyncRouterServer(() -> server.getContentSummary(req.getPath()), + result -> GetContentSummaryResponseProto.newBuilder() + .setSummary(PBHelperClient.convert(result)).build()); + return null; + } + + @Override + public SetQuotaResponseProto setQuota( + RpcController controller, + SetQuotaRequestProto req) { + asyncRouterServer(() -> { + server.setQuota(req.getPath(), req.getNamespaceQuota(), + req.getStoragespaceQuota(), + req.hasStorageType() ? + PBHelperClient.convertStorageType(req.getStorageType()): null); + return null; + }, result -> VOID_SETQUOTA_RESPONSE); + return null; + } + + @Override + public FsyncResponseProto fsync( + RpcController controller, + FsyncRequestProto req) { + asyncRouterServer(() -> { + server.fsync(req.getSrc(), req.getFileId(), + req.getClient(), req.getLastBlockLength()); + return null; + }, result -> VOID_FSYNC_RESPONSE); + return null; + } + + @Override + public SetTimesResponseProto setTimes( + RpcController controller, + SetTimesRequestProto req) { + asyncRouterServer(() -> { + server.setTimes(req.getSrc(), req.getMtime(), req.getAtime()); + return null; + }, result -> VOID_SETTIMES_RESPONSE); + return null; + } + + @Override + public CreateSymlinkResponseProto createSymlink( + RpcController controller, + CreateSymlinkRequestProto req) { + asyncRouterServer(() -> { + server.createSymlink(req.getTarget(), req.getLink(), + PBHelperClient.convert(req.getDirPerm()), req.getCreateParent()); + return null; + }, result -> VOID_CREATESYMLINK_RESPONSE); + return null; + } + + @Override + public GetLinkTargetResponseProto getLinkTarget( + RpcController controller, + GetLinkTargetRequestProto req) { + asyncRouterServer(() -> server.getLinkTarget(req.getPath()), + result -> { + GetLinkTargetResponseProto.Builder builder = + GetLinkTargetResponseProto + .newBuilder(); + if (result != null) { + builder.setTargetPath(result); + } + return builder.build(); + }); + return null; + } + + @Override + public UpdateBlockForPipelineResponseProto updateBlockForPipeline( + RpcController controller, UpdateBlockForPipelineRequestProto req) { + asyncRouterServer(() -> server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()), + req.getClientName()), + result -> { + HdfsProtos.LocatedBlockProto res = PBHelperClient.convertLocatedBlock(result); + return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(res) + .build(); + }); + return null; + } + + @Override + public UpdatePipelineResponseProto updatePipeline( + RpcController controller, + UpdatePipelineRequestProto req) { + asyncRouterServer(() -> { + List newNodes = req.getNewNodesList(); + List newStorageIDs = req.getStorageIDsList(); + server.updatePipeline(req.getClientName(), + PBHelperClient.convert(req.getOldBlock()), + PBHelperClient.convert(req.getNewBlock()), + PBHelperClient.convert(newNodes.toArray(new HdfsProtos.DatanodeIDProto[newNodes.size()])), + newStorageIDs.toArray(new String[newStorageIDs.size()])); + return null; + }, result -> VOID_UPDATEPIPELINE_RESPONSE); + return null; + } + + @Override + public GetDelegationTokenResponseProto getDelegationToken( + RpcController controller, GetDelegationTokenRequestProto req) { + asyncRouterServer(() -> server + .getDelegationToken(new Text(req.getRenewer())), + token -> { + GetDelegationTokenResponseProto.Builder rspBuilder = + GetDelegationTokenResponseProto.newBuilder(); + if (token != null) { + rspBuilder.setToken(PBHelperClient.convert(token)); + } + return rspBuilder.build(); + }); + return null; + } + + @Override + public RenewDelegationTokenResponseProto renewDelegationToken( + RpcController controller, RenewDelegationTokenRequestProto req) { + asyncRouterServer(() -> server.renewDelegationToken(PBHelperClient + .convertDelegationToken(req.getToken())), + result -> RenewDelegationTokenResponseProto.newBuilder() + .setNewExpiryTime(result).build()); + return null; + } + + @Override + public CancelDelegationTokenResponseProto cancelDelegationToken( + RpcController controller, CancelDelegationTokenRequestProto req) { + asyncRouterServer(() -> { + server.cancelDelegationToken(PBHelperClient.convertDelegationToken(req + .getToken())); + return null; + }, result -> VOID_CANCELDELEGATIONTOKEN_RESPONSE); + return null; + } + + @Override + public SetBalancerBandwidthResponseProto setBalancerBandwidth( + RpcController controller, SetBalancerBandwidthRequestProto req) { + asyncRouterServer(() -> { + server.setBalancerBandwidth(req.getBandwidth()); + return null; + }, result -> VOID_SETBALANCERBANDWIDTH_RESPONSE); + return null; + } + + @Override + public GetDataEncryptionKeyResponseProto getDataEncryptionKey( + RpcController controller, GetDataEncryptionKeyRequestProto request) { + asyncRouterServer(server::getDataEncryptionKey, encryptionKey -> { + GetDataEncryptionKeyResponseProto.Builder builder = + GetDataEncryptionKeyResponseProto.newBuilder(); + if (encryptionKey != null) { + builder.setDataEncryptionKey(PBHelperClient.convert(encryptionKey)); + } + return builder.build(); + }); + return null; + } + + @Override + public CreateSnapshotResponseProto createSnapshot( + RpcController controller, + CreateSnapshotRequestProto req) throws ServiceException { + asyncRouterServer(() -> server.createSnapshot(req.getSnapshotRoot(), + req.hasSnapshotName()? req.getSnapshotName(): null), + snapshotPath -> { + final CreateSnapshotResponseProto.Builder builder + = CreateSnapshotResponseProto.newBuilder(); + if (snapshotPath != null) { + builder.setSnapshotPath(snapshotPath); + } + return builder.build(); + }); + return null; + } + + @Override + public DeleteSnapshotResponseProto deleteSnapshot( + RpcController controller, + DeleteSnapshotRequestProto req) { + asyncRouterServer(() -> { + server.deleteSnapshot(req.getSnapshotRoot(), req.getSnapshotName()); + return null; + }, result -> VOID_DELETE_SNAPSHOT_RESPONSE); + return null; + } + + @Override + public AllowSnapshotResponseProto allowSnapshot( + RpcController controller, + AllowSnapshotRequestProto req) { + asyncRouterServer(() -> { + server.allowSnapshot(req.getSnapshotRoot()); + return null; + }, result -> VOID_ALLOW_SNAPSHOT_RESPONSE); + return null; + } + + @Override + public DisallowSnapshotResponseProto disallowSnapshot( + RpcController controller, + DisallowSnapshotRequestProto req) { + asyncRouterServer(() -> { + server.disallowSnapshot(req.getSnapshotRoot()); + return null; + }, result -> VOID_DISALLOW_SNAPSHOT_RESPONSE); + return null; + } + + @Override + public RenameSnapshotResponseProto renameSnapshot( + RpcController controller, + RenameSnapshotRequestProto request) { + asyncRouterServer(() -> { + server.renameSnapshot(request.getSnapshotRoot(), + request.getSnapshotOldName(), request.getSnapshotNewName()); + return null; + }, result -> VOID_RENAME_SNAPSHOT_RESPONSE); + return null; + } + + @Override + public GetSnapshottableDirListingResponseProto getSnapshottableDirListing( + RpcController controller, GetSnapshottableDirListingRequestProto request) { + asyncRouterServer(server::getSnapshottableDirListing, + result -> { + if (result != null) { + return GetSnapshottableDirListingResponseProto.newBuilder(). + setSnapshottableDirList(PBHelperClient.convert(result)).build(); + } else { + return NULL_GET_SNAPSHOTTABLE_DIR_LISTING_RESPONSE; + } + }); + return null; + } + + @Override + public GetSnapshotListingResponseProto getSnapshotListing( + RpcController controller, GetSnapshotListingRequestProto request) { + asyncRouterServer(() -> server + .getSnapshotListing(request.getSnapshotRoot()), + result -> { + if (result != null) { + return GetSnapshotListingResponseProto.newBuilder(). + setSnapshotList(PBHelperClient.convert(result)).build(); + } else { + return NULL_GET_SNAPSHOT_LISTING_RESPONSE; + } + }); + return null; + } + + @Override + public GetSnapshotDiffReportResponseProto getSnapshotDiffReport( + RpcController controller, GetSnapshotDiffReportRequestProto request) { + asyncRouterServer(() -> server.getSnapshotDiffReport( + request.getSnapshotRoot(), request.getFromSnapshot(), + request.getToSnapshot()), + report -> GetSnapshotDiffReportResponseProto.newBuilder() + .setDiffReport(PBHelperClient.convert(report)).build()); + return null; + } + + @Override + public GetSnapshotDiffReportListingResponseProto getSnapshotDiffReportListing( + RpcController controller, + GetSnapshotDiffReportListingRequestProto request) { + asyncRouterServer(() -> server + .getSnapshotDiffReportListing(request.getSnapshotRoot(), + request.getFromSnapshot(), request.getToSnapshot(), + request.getCursor().getStartPath().toByteArray(), + request.getCursor().getIndex()), + report -> GetSnapshotDiffReportListingResponseProto.newBuilder() + .setDiffReport(PBHelperClient.convert(report)).build()); + return null; + } + + @Override + public IsFileClosedResponseProto isFileClosed( + RpcController controller, IsFileClosedRequestProto request) { + asyncRouterServer(() -> server.isFileClosed(request.getSrc()), + result -> IsFileClosedResponseProto.newBuilder() + .setResult(result).build()); + return null; + } + + @Override + public AddCacheDirectiveResponseProto addCacheDirective( + RpcController controller, AddCacheDirectiveRequestProto request) { + asyncRouterServer(() -> server.addCacheDirective( + PBHelperClient.convert(request.getInfo()), + PBHelperClient.convertCacheFlags(request.getCacheFlags())), + id -> AddCacheDirectiveResponseProto.newBuilder(). + setId(id).build()); + return null; + } + + @Override + public ModifyCacheDirectiveResponseProto modifyCacheDirective( + RpcController controller, ModifyCacheDirectiveRequestProto request) { + asyncRouterServer(() -> { + server.modifyCacheDirective( + PBHelperClient.convert(request.getInfo()), + PBHelperClient.convertCacheFlags(request.getCacheFlags())); + return null; + }, result -> ModifyCacheDirectiveResponseProto.newBuilder().build()); + return null; + } + + @Override + public RemoveCacheDirectiveResponseProto removeCacheDirective( + RpcController controller, + RemoveCacheDirectiveRequestProto request) { + asyncRouterServer(() -> { + server.removeCacheDirective(request.getId()); + return null; + }, result -> RemoveCacheDirectiveResponseProto. + newBuilder().build()); + return null; + } + + @Override + public ListCacheDirectivesResponseProto listCacheDirectives( + RpcController controller, ListCacheDirectivesRequestProto request) { + asyncRouterServer(() -> { + CacheDirectiveInfo filter = + PBHelperClient.convert(request.getFilter()); + return server.listCacheDirectives(request.getPrevId(), filter); + }, entries -> { + ListCacheDirectivesResponseProto.Builder builder = + ListCacheDirectivesResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i=0, n=entries.size(); i { + server.addCachePool(PBHelperClient.convert(request.getInfo())); + return null; + }, result -> AddCachePoolResponseProto.newBuilder().build()); + return null; + } + + @Override + public ModifyCachePoolResponseProto modifyCachePool( + RpcController controller, + ModifyCachePoolRequestProto request) { + asyncRouterServer(() -> { + server.modifyCachePool(PBHelperClient.convert(request.getInfo())); + return null; + }, result -> ModifyCachePoolResponseProto.newBuilder().build()); + return null; + } + + @Override + public RemoveCachePoolResponseProto removeCachePool( + RpcController controller, + RemoveCachePoolRequestProto request) { + asyncRouterServer(() -> { + server.removeCachePool(request.getPoolName()); + return null; + }, result -> RemoveCachePoolResponseProto.newBuilder().build()); + return null; + } + + @Override + public ListCachePoolsResponseProto listCachePools( + RpcController controller, + ListCachePoolsRequestProto request) { + asyncRouterServer(() -> server.listCachePools(request.getPrevPoolName()), + entries -> { + ListCachePoolsResponseProto.Builder responseBuilder = + ListCachePoolsResponseProto.newBuilder(); + responseBuilder.setHasMore(entries.hasMore()); + for (int i=0, n=entries.size(); i { + server.modifyAclEntries(req.getSrc(), PBHelperClient.convertAclEntry(req.getAclSpecList())); + return null; + }, vo -> VOID_MODIFYACLENTRIES_RESPONSE); + return null; + } + + @Override + public RemoveAclEntriesResponseProto removeAclEntries( + RpcController controller, RemoveAclEntriesRequestProto req) { + asyncRouterServer(() -> { + server.removeAclEntries(req.getSrc(), + PBHelperClient.convertAclEntry(req.getAclSpecList())); + return null; + }, vo -> VOID_REMOVEACLENTRIES_RESPONSE); + return null; + } + + @Override + public RemoveDefaultAclResponseProto removeDefaultAcl( + RpcController controller, RemoveDefaultAclRequestProto req) { + asyncRouterServer(() -> { + server.removeDefaultAcl(req.getSrc()); + return null; + }, vo -> VOID_REMOVEDEFAULTACL_RESPONSE); + return null; + } + + @Override + public RemoveAclResponseProto removeAcl( + RpcController controller, + RemoveAclRequestProto req) { + asyncRouterServer(() -> { + server.removeAcl(req.getSrc()); + return null; + }, vo -> VOID_REMOVEACL_RESPONSE); + return null; + } + + @Override + public SetAclResponseProto setAcl( + RpcController controller, + SetAclRequestProto req) { + asyncRouterServer(() -> { + server.setAcl(req.getSrc(), PBHelperClient.convertAclEntry(req.getAclSpecList())); + return null; + }, vo -> VOID_SETACL_RESPONSE); + return null; + } + + @Override + public GetAclStatusResponseProto getAclStatus( + RpcController controller, + GetAclStatusRequestProto req) { + asyncRouterServer(() -> server.getAclStatus(req.getSrc()), + PBHelperClient::convert); + return null; + } + + @Override + public CreateEncryptionZoneResponseProto createEncryptionZone( + RpcController controller, CreateEncryptionZoneRequestProto req) { + asyncRouterServer(() -> { + server.createEncryptionZone(req.getSrc(), req.getKeyName()); + return null; + }, vo -> CreateEncryptionZoneResponseProto.newBuilder().build()); + return null; + } + + @Override + public GetEZForPathResponseProto getEZForPath( + RpcController controller, GetEZForPathRequestProto req) { + asyncRouterServer(() -> server.getEZForPath(req.getSrc()), + ret -> { + GetEZForPathResponseProto.Builder builder = + GetEZForPathResponseProto.newBuilder(); + if (ret != null) { + builder.setZone(PBHelperClient.convert(ret)); + } + return builder.build(); + }); + return null; + } + + @Override + public ListEncryptionZonesResponseProto listEncryptionZones( + RpcController controller, ListEncryptionZonesRequestProto req) { + asyncRouterServer(() -> server.listEncryptionZones(req.getId()), + entries -> { + ListEncryptionZonesResponseProto.Builder builder = + ListEncryptionZonesResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i=0; i { + server.reencryptEncryptionZone(req.getZone(), + PBHelperClient.convert(req.getAction())); + return null; + }, vo -> ReencryptEncryptionZoneResponseProto.newBuilder().build()); + return null; + } + + public ListReencryptionStatusResponseProto listReencryptionStatus( + RpcController controller, ListReencryptionStatusRequestProto req) { + asyncRouterServer(() -> server.listReencryptionStatus(req.getId()), + entries -> { + ListReencryptionStatusResponseProto.Builder builder = + ListReencryptionStatusResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i=0; i { + String ecPolicyName = req.hasEcPolicyName() ? + req.getEcPolicyName() : null; + server.setErasureCodingPolicy(req.getSrc(), ecPolicyName); + return null; + }, vo -> SetErasureCodingPolicyResponseProto.newBuilder().build()); + return null; + } + + @Override + public UnsetErasureCodingPolicyResponseProto unsetErasureCodingPolicy( + RpcController controller, UnsetErasureCodingPolicyRequestProto req) { + asyncRouterServer(() -> { + server.unsetErasureCodingPolicy(req.getSrc()); + return null; + }, vo -> UnsetErasureCodingPolicyResponseProto.newBuilder().build()); + return null; + } + + @Override + public GetECTopologyResultForPoliciesResponseProto getECTopologyResultForPolicies( + RpcController controller, GetECTopologyResultForPoliciesRequestProto req) { + asyncRouterServer(() -> { + ProtocolStringList policies = req.getPoliciesList(); + return server.getECTopologyResultForPolicies( + policies.toArray(policies.toArray(new String[policies.size()]))); + }, result -> { + GetECTopologyResultForPoliciesResponseProto.Builder builder = + GetECTopologyResultForPoliciesResponseProto.newBuilder(); + builder + .setResponse(PBHelperClient.convertECTopologyVerifierResult(result)); + return builder.build(); + }); + return null; + } + + @Override + public SetXAttrResponseProto setXAttr( + RpcController controller, + SetXAttrRequestProto req) { + asyncRouterServer(() -> { + server.setXAttr(req.getSrc(), PBHelperClient.convertXAttr(req.getXAttr()), + PBHelperClient.convert(req.getFlag())); + return null; + }, vo -> VOID_SETXATTR_RESPONSE); + return null; + } + + @Override + public GetXAttrsResponseProto getXAttrs( + RpcController controller, + GetXAttrsRequestProto req) { + asyncRouterServer(() -> server.getXAttrs(req.getSrc(), + PBHelperClient.convertXAttrs(req.getXAttrsList())), + PBHelperClient::convertXAttrsResponse); + return null; + } + + @Override + public ListXAttrsResponseProto listXAttrs( + RpcController controller, + ListXAttrsRequestProto req) { + asyncRouterServer(() -> server.listXAttrs(req.getSrc()), + PBHelperClient::convertListXAttrsResponse); + return null; + } + + @Override + public RemoveXAttrResponseProto removeXAttr( + RpcController controller, + RemoveXAttrRequestProto req) { + asyncRouterServer(() -> { + server.removeXAttr(req.getSrc(), PBHelperClient.convertXAttr(req.getXAttr())); + return null; + }, vo -> VOID_REMOVEXATTR_RESPONSE); + return null; + } + + @Override + public CheckAccessResponseProto checkAccess( + RpcController controller, + CheckAccessRequestProto req) { + asyncRouterServer(() -> { + server.checkAccess(req.getPath(), PBHelperClient.convert(req.getMode())); + return null; + }, vo -> VOID_CHECKACCESS_RESPONSE); + return null; + } + + @Override + public SetStoragePolicyResponseProto setStoragePolicy( + RpcController controller, SetStoragePolicyRequestProto request) { + asyncRouterServer(() -> { + server.setStoragePolicy(request.getSrc(), request.getPolicyName()); + return null; + }, vo -> VOID_SET_STORAGE_POLICY_RESPONSE); + return null; + } + + @Override + public UnsetStoragePolicyResponseProto unsetStoragePolicy( + RpcController controller, UnsetStoragePolicyRequestProto request) { + asyncRouterServer(() -> { + server.unsetStoragePolicy(request.getSrc()); + return null; + }, vo -> VOID_UNSET_STORAGE_POLICY_RESPONSE); + return null; + } + + @Override + public GetStoragePolicyResponseProto getStoragePolicy( + RpcController controller, GetStoragePolicyRequestProto request) { + asyncRouterServer(() -> server.getStoragePolicy(request.getPath()), + result -> { + HdfsProtos.BlockStoragePolicyProto policy = PBHelperClient.convert(result); + return GetStoragePolicyResponseProto.newBuilder() + .setStoragePolicy(policy).build(); + }); + return null; + } + + @Override + public GetStoragePoliciesResponseProto getStoragePolicies( + RpcController controller, GetStoragePoliciesRequestProto request) { + asyncRouterServer(server::getStoragePolicies, + policies -> { + GetStoragePoliciesResponseProto.Builder builder = + GetStoragePoliciesResponseProto.newBuilder(); + if (policies == null) { + return builder.build(); + } + for (BlockStoragePolicy policy : policies) { + builder.addPolicies(PBHelperClient.convert(policy)); + } + return builder.build(); + }); + return null; + } + + public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid( + RpcController controller, + GetCurrentEditLogTxidRequestProto req) { + asyncRouterServer(server::getCurrentEditLogTxid, + result -> GetCurrentEditLogTxidResponseProto.newBuilder() + .setTxid(result).build()); + return null; + } + + @Override + public GetEditsFromTxidResponseProto getEditsFromTxid( + RpcController controller, + GetEditsFromTxidRequestProto req) { + asyncRouterServer(() -> server.getEditsFromTxid(req.getTxid()), + PBHelperClient::convertEditsResponse); + return null; + } + + @Override + public GetErasureCodingPoliciesResponseProto getErasureCodingPolicies( + RpcController controller, + GetErasureCodingPoliciesRequestProto request) { + asyncRouterServer(server::getErasureCodingPolicies, + ecpInfos -> { + GetErasureCodingPoliciesResponseProto.Builder resBuilder = + GetErasureCodingPoliciesResponseProto + .newBuilder(); + for (ErasureCodingPolicyInfo info : ecpInfos) { + resBuilder.addEcPolicies( + PBHelperClient.convertErasureCodingPolicy(info)); + } + return resBuilder.build(); + }); + return null; + } + + @Override + public GetErasureCodingCodecsResponseProto getErasureCodingCodecs( + RpcController controller, GetErasureCodingCodecsRequestProto request) { + asyncRouterServer(server::getErasureCodingCodecs, + codecs -> { + GetErasureCodingCodecsResponseProto.Builder resBuilder = + GetErasureCodingCodecsResponseProto.newBuilder(); + for (Map.Entry codec : codecs.entrySet()) { + resBuilder.addCodec( + PBHelperClient.convertErasureCodingCodec( + codec.getKey(), codec.getValue())); + } + return resBuilder.build(); + }); + return null; + } + + @Override + public AddErasureCodingPoliciesResponseProto addErasureCodingPolicies( + RpcController controller, AddErasureCodingPoliciesRequestProto request) { + asyncRouterServer(() -> { + ErasureCodingPolicy[] policies = request.getEcPoliciesList().stream() + .map(PBHelperClient::convertErasureCodingPolicy) + .toArray(ErasureCodingPolicy[]::new); + return server + .addErasureCodingPolicies(policies); + }, result -> { + List responseProtos = + Arrays.stream(result) + .map(PBHelperClient::convertAddErasureCodingPolicyResponse) + .collect(Collectors.toList()); + AddErasureCodingPoliciesResponseProto response = + AddErasureCodingPoliciesResponseProto.newBuilder() + .addAllResponses(responseProtos).build(); + return response; + }); + return null; + } + + @Override + public RemoveErasureCodingPolicyResponseProto removeErasureCodingPolicy( + RpcController controller, RemoveErasureCodingPolicyRequestProto request) { + asyncRouterServer(() -> { + server.removeErasureCodingPolicy(request.getEcPolicyName()); + return null; + }, vo -> RemoveErasureCodingPolicyResponseProto.newBuilder().build()); + return null; + } + + @Override + public EnableErasureCodingPolicyResponseProto enableErasureCodingPolicy( + RpcController controller, EnableErasureCodingPolicyRequestProto request) { + asyncRouterServer(() -> { + server.enableErasureCodingPolicy(request.getEcPolicyName()); + return null; + }, vo -> EnableErasureCodingPolicyResponseProto.newBuilder().build()); + return null; + } + + @Override + public DisableErasureCodingPolicyResponseProto disableErasureCodingPolicy( + RpcController controller, DisableErasureCodingPolicyRequestProto request) { + asyncRouterServer(() -> { + server.disableErasureCodingPolicy(request.getEcPolicyName()); + return null; + }, vo -> DisableErasureCodingPolicyResponseProto.newBuilder().build()); + return null; + } + + @Override + public GetErasureCodingPolicyResponseProto getErasureCodingPolicy( + RpcController controller, + GetErasureCodingPolicyRequestProto request) { + asyncRouterServer(() -> server.getErasureCodingPolicy(request.getSrc()), + ecPolicy -> { + GetErasureCodingPolicyResponseProto.Builder builder = + GetErasureCodingPolicyResponseProto.newBuilder(); + if (ecPolicy != null) { + builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy)); + } + return builder.build(); + }); + return null; + } + + @Override + public GetQuotaUsageResponseProto getQuotaUsage( + RpcController controller, GetQuotaUsageRequestProto req) { + asyncRouterServer(() -> server.getQuotaUsage(req.getPath()), + result -> GetQuotaUsageResponseProto.newBuilder() + .setUsage(PBHelperClient.convert(result)).build()); + return null; + } + + @Override + public ListOpenFilesResponseProto listOpenFiles( + RpcController controller, + ListOpenFilesRequestProto req) { + asyncRouterServer(() -> { + EnumSet openFilesTypes = + PBHelperClient.convertOpenFileTypes(req.getTypesList()); + return server.listOpenFiles(req.getId(), + openFilesTypes, req.getPath()); + }, entries -> { + ListOpenFilesResponseProto.Builder builder = + ListOpenFilesResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i = 0; i < entries.size(); i++) { + builder.addEntries(PBHelperClient.convert(entries.get(i))); + } + builder.addAllTypes(req.getTypesList()); + return builder.build(); + }); + return null; + } + + @Override + public MsyncResponseProto msync(RpcController controller, MsyncRequestProto req) { + asyncRouterServer(() -> { + server.msync(); + return null; + }, vo -> MsyncResponseProto.newBuilder().build()); + return null; + } + + @Override + public SatisfyStoragePolicyResponseProto satisfyStoragePolicy( + RpcController controller, SatisfyStoragePolicyRequestProto request) { + asyncRouterServer(() -> { + server.satisfyStoragePolicy(request.getSrc()); + return null; + }, vo -> VOID_SATISFYSTORAGEPOLICY_RESPONSE); + return null; + } + + @Override + public HAServiceStateResponseProto getHAServiceState( + RpcController controller, + HAServiceStateRequestProto request) { + asyncRouterServer(server::getHAServiceState, + state -> { + HAServiceProtocolProtos.HAServiceStateProto retState; + switch (state) { + case ACTIVE: + retState = HAServiceProtocolProtos.HAServiceStateProto.ACTIVE; + break; + case STANDBY: + retState = HAServiceProtocolProtos.HAServiceStateProto.STANDBY; + break; + case OBSERVER: + retState = HAServiceProtocolProtos.HAServiceStateProto.OBSERVER; + break; + case INITIALIZING: + default: + retState = HAServiceProtocolProtos.HAServiceStateProto.INITIALIZING; + break; + } + HAServiceStateResponseProto.Builder builder = + HAServiceStateResponseProto.newBuilder(); + builder.setState(retState); + return builder.build(); + }); + return null; + } + + @Override + public GetSlowDatanodeReportResponseProto getSlowDatanodeReport( + RpcController controller, + GetSlowDatanodeReportRequestProto request) { + asyncRouterServer(server::getSlowDatanodeReport, + res -> { + List result = + PBHelperClient.convert(res); + return GetSlowDatanodeReportResponseProto.newBuilder() + .addAllDatanodeInfoProto(result) + .build(); + }); + return null; + } + + @Override + public GetEnclosingRootResponseProto getEnclosingRoot( + RpcController controller, GetEnclosingRootRequestProto req) { + asyncRouterServer(() -> server.getEnclosingRoot(req.getFilename()), + enclosingRootPath -> ClientNamenodeProtocolProtos + .GetEnclosingRootResponseProto.newBuilder() + .setEnclosingRootPath(enclosingRootPath.toUri().toString()) + .build()); + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000000..9f607f08f18b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterGetUserMappingsProtocolServerSideTranslatorPB.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.thirdparty.protobuf.RpcController; +import org.apache.hadoop.thirdparty.protobuf.ServiceException; +import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto; +import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer; + +public class RouterGetUserMappingsProtocolServerSideTranslatorPB + extends GetUserMappingsProtocolServerSideTranslatorPB { + private final RouterRpcServer server; + private final boolean isAsyncRpc; + + public RouterGetUserMappingsProtocolServerSideTranslatorPB(GetUserMappingsProtocol impl) { + super(impl); + this.server = (RouterRpcServer) impl; + this.isAsyncRpc = server.isAsync(); + } + + @Override + public GetGroupsForUserResponseProto getGroupsForUser( + RpcController controller, + GetGroupsForUserRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.getGroupsForUser(controller, request); + } + asyncRouterServer(() -> server.getGroupsForUser(request.getUser()), groups -> { + GetGroupsForUserResponseProto.Builder builder = + GetGroupsForUserResponseProto + .newBuilder(); + for (String g : groups) { + builder.addGroups(g); + } + return builder.build(); + }); + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000000..d706c1148e1b8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterNamenodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.thirdparty.protobuf.RpcController; +import org.apache.hadoop.thirdparty.protobuf.ServiceException; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer; + +public class RouterNamenodeProtocolServerSideTranslatorPB + extends NamenodeProtocolServerSideTranslatorPB { + + private final RouterRpcServer server; + private final boolean isAsyncRpc; + + public RouterNamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) { + super(impl); + this.server = (RouterRpcServer) impl; + this.isAsyncRpc = server.isAsync(); + } + + @Override + public GetBlocksResponseProto getBlocks(RpcController unused, + GetBlocksRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.getBlocks(unused, request); + } + asyncRouterServer(() -> { + DatanodeInfo dnInfo = new DatanodeInfo.DatanodeInfoBuilder() + .setNodeID(PBHelperClient.convert(request.getDatanode())) + .build(); + return server.getBlocks(dnInfo, request.getSize(), + request.getMinBlockSize(), request.getTimeInterval(), + request.hasStorageType() ? + PBHelperClient.convertStorageType(request.getStorageType()): null); + }, blocks -> + GetBlocksResponseProto.newBuilder() + .setBlocks(PBHelper.convert(blocks)).build()); + return null; + } + + @Override + public GetBlockKeysResponseProto getBlockKeys(RpcController unused, + GetBlockKeysRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.getBlockKeys(unused, request); + } + asyncRouterServer(server::getBlockKeys, keys -> { + GetBlockKeysResponseProto.Builder builder = + GetBlockKeysResponseProto.newBuilder(); + if (keys != null) { + builder.setKeys(PBHelper.convert(keys)); + } + return builder.build(); + }); + return null; + } + + @Override + public GetTransactionIdResponseProto getTransactionId(RpcController unused, + GetTransactionIdRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.getTransactionId(unused, request); + } + asyncRouterServer(server::getTransactionID, + txid -> GetTransactionIdResponseProto + .newBuilder().setTxId(txid).build()); + return null; + } + + @Override + public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId( + RpcController unused, GetMostRecentCheckpointTxIdRequestProto request) + throws ServiceException { + if (!isAsyncRpc) { + return super.getMostRecentCheckpointTxId(unused, request); + } + asyncRouterServer(server::getMostRecentCheckpointTxId, + txid -> GetMostRecentCheckpointTxIdResponseProto + .newBuilder().setTxId(txid).build()); + return null; + } + + @Override + public GetMostRecentNameNodeFileTxIdResponseProto getMostRecentNameNodeFileTxId( + RpcController unused, GetMostRecentNameNodeFileTxIdRequestProto request) + throws ServiceException { + if (!isAsyncRpc) { + return super.getMostRecentNameNodeFileTxId(unused, request); + } + asyncRouterServer(() -> server.getMostRecentNameNodeFileTxId( + NNStorage.NameNodeFile.valueOf(request.getNameNodeFile())), + txid -> GetMostRecentNameNodeFileTxIdResponseProto + .newBuilder().setTxId(txid).build()); + return null; + } + + @Override + public RollEditLogResponseProto rollEditLog(RpcController unused, + RollEditLogRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.rollEditLog(unused, request); + } + asyncRouterServer(server::rollEditLog, + signature -> RollEditLogResponseProto.newBuilder() + .setSignature(PBHelper.convert(signature)).build()); + return null; + } + + @Override + public ErrorReportResponseProto errorReport(RpcController unused, + ErrorReportRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.errorReport(unused, request); + } + asyncRouterServer(() -> { + server.errorReport(PBHelper.convert(request.getRegistration()), + request.getErrorCode(), request.getMsg()); + return null; + }, result -> VOID_ERROR_REPORT_RESPONSE); + return null; + } + + @Override + public RegisterResponseProto registerSubordinateNamenode( + RpcController unused, RegisterRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.registerSubordinateNamenode(unused, request); + } + asyncRouterServer(() -> server.registerSubordinateNamenode( + PBHelper.convert(request.getRegistration())), + reg -> RegisterResponseProto.newBuilder() + .setRegistration(PBHelper.convert(reg)).build()); + return null; + } + + @Override + public StartCheckpointResponseProto startCheckpoint(RpcController unused, + StartCheckpointRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.startCheckpoint(unused, request); + } + asyncRouterServer(() -> + server.startCheckpoint(PBHelper.convert(request.getRegistration())), + cmd -> StartCheckpointResponseProto.newBuilder() + .setCommand(PBHelper.convert(cmd)).build()); + return null; + } + + + @Override + public EndCheckpointResponseProto endCheckpoint(RpcController unused, + EndCheckpointRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.endCheckpoint(unused, request); + } + asyncRouterServer(() -> { + server.endCheckpoint(PBHelper.convert(request.getRegistration()), + PBHelper.convert(request.getSignature())); + return null; + }, result -> VOID_END_CHECKPOINT_RESPONSE); + return null; + } + + @Override + public GetEditLogManifestResponseProto getEditLogManifest( + RpcController unused, GetEditLogManifestRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.getEditLogManifest(unused, request); + } + asyncRouterServer(() -> server.getEditLogManifest(request.getSinceTxId()), + manifest -> GetEditLogManifestResponseProto.newBuilder() + .setManifest(PBHelper.convert(manifest)).build()); + return null; + } + + @Override + public VersionResponseProto versionRequest( + RpcController controller, + VersionRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.versionRequest(controller, request); + } + asyncRouterServer(server::versionRequest, + info -> VersionResponseProto.newBuilder() + .setInfo(PBHelper.convert(info)).build()); + return null; + } + + @Override + public IsUpgradeFinalizedResponseProto isUpgradeFinalized(RpcController controller, + IsUpgradeFinalizedRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.isUpgradeFinalized(controller, request); + } + asyncRouterServer(server::isUpgradeFinalized, + isUpgradeFinalized -> IsUpgradeFinalizedResponseProto.newBuilder() + .setIsUpgradeFinalized(isUpgradeFinalized).build()); + return null; + } + + @Override + public IsRollingUpgradeResponseProto isRollingUpgrade( + RpcController controller, IsRollingUpgradeRequestProto request) + throws ServiceException { + if (!isAsyncRpc) { + return super.isRollingUpgrade(controller, request); + } + asyncRouterServer(server::isRollingUpgrade, + isRollingUpgrade -> IsRollingUpgradeResponseProto.newBuilder() + .setIsRollingUpgrade(isRollingUpgrade).build()); + return null; + } + + @Override + public GetNextSPSPathResponseProto getNextSPSPath( + RpcController controller, GetNextSPSPathRequestProto request) + throws ServiceException { + if (!isAsyncRpc) { + return super.getNextSPSPath(controller, request); + } + asyncRouterServer(server::getNextSPSPath, + nextSPSPath -> GetNextSPSPathResponseProto.newBuilder() + .setSpsPath(nextSPSPath).build()); + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000000..9cffac88ee60c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterRefreshUserMappingsProtocolServerSideTranslatorPB.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; +import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto; +import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationResponseProto; +import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto; +import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsResponseProto; +import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; +import org.apache.hadoop.thirdparty.protobuf.RpcController; +import org.apache.hadoop.thirdparty.protobuf.ServiceException; + +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer; + +public class RouterRefreshUserMappingsProtocolServerSideTranslatorPB + extends RefreshUserMappingsProtocolServerSideTranslatorPB { + + private final RouterRpcServer server; + private final boolean isAsyncRpc; + + private final static RefreshUserToGroupsMappingsResponseProto + VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE = + RefreshUserToGroupsMappingsResponseProto.newBuilder().build(); + + private final static RefreshSuperUserGroupsConfigurationResponseProto + VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE = + RefreshSuperUserGroupsConfigurationResponseProto.newBuilder() + .build(); + + public RouterRefreshUserMappingsProtocolServerSideTranslatorPB( + RefreshUserMappingsProtocol impl) { + super(impl); + this.server = (RouterRpcServer) impl; + this.isAsyncRpc = server.isAsync(); + } + + @Override + public RefreshUserToGroupsMappingsResponseProto refreshUserToGroupsMappings( + RpcController controller, RefreshUserToGroupsMappingsRequestProto request) + throws ServiceException { + if (!isAsyncRpc) { + return super.refreshUserToGroupsMappings(controller, request); + } + asyncRouterServer(() -> { + server.refreshUserToGroupsMappings(); + return null; + }, result -> + VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE); + return null; + } + + @Override + public RefreshSuperUserGroupsConfigurationResponseProto refreshSuperUserGroupsConfiguration( + RpcController controller, + RefreshSuperUserGroupsConfigurationRequestProto request) throws ServiceException { + if (!isAsyncRpc) { + return super.refreshSuperUserGroupsConfiguration(controller, request); + } + asyncRouterServer(() -> { + server.refreshSuperUserGroupsConfiguration(); + return null; + }, result -> + VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE); + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index aeed5e678a600..5c3339048465f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.federation.metrics; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.apache.hadoop.util.Time.now; import java.io.IOException; @@ -469,6 +470,9 @@ private String getNodesImpl(final DatanodeReportType type) { this.router.getRpcServer().getClientProtocolModule(); DatanodeStorageReport[] datanodeStorageReports = clientProtocol.getDatanodeStorageReport(type, false, dnReportTimeOut); + if (router.getRpcServer().isAsync()) { + datanodeStorageReports = syncReturn(DatanodeStorageReport[].class); + } for (DatanodeStorageReport datanodeStorageReport : datanodeStorageReports) { DatanodeInfo node = datanodeStorageReport.getDatanodeInfo(); StorageReport[] storageReports = datanodeStorageReport.getStorageReports(); @@ -502,7 +506,7 @@ private String getNodesImpl(final DatanodeReportType type) { LOG.error("Cannot get {} nodes, Router in safe mode", type); } catch (SubClusterTimeoutException e) { LOG.error("Cannot get {} nodes, subclusters timed out responding", type); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Cannot get " + type + " nodes", e); } return JSON.toString(info); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java index 0a28688c916d3..076de9bc303f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.federation.metrics; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.util.Time.now; @@ -559,7 +560,12 @@ public String getNodeUsage() { DatanodeInfo[] live = null; if (this.enableGetDNUsage) { RouterRpcServer rpcServer = this.router.getRpcServer(); - live = rpcServer.getDatanodeReport(DatanodeReportType.LIVE, false, timeOut); + if (rpcServer.isAsync()) { + rpcServer.getDatanodeReportAsync(DatanodeReportType.LIVE, false, timeOut); + live = syncReturn(DatanodeInfo[].class); + } else { + live = rpcServer.getDatanodeReport(DatanodeReportType.LIVE, false, timeOut); + } } else { LOG.debug("Getting node usage is disabled."); } @@ -578,7 +584,7 @@ public String getNodeUsage() { StandardDeviation deviation = new StandardDeviation(); dev = deviation.evaluate(usages); } - } catch (IOException e) { + } catch (Exception e) { LOG.error("Cannot get the live nodes: {}", e.getMessage()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 0c1d3dfbdec2a..060d207837f41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; import java.io.FileNotFoundException; @@ -75,6 +76,10 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; +import org.apache.hadoop.hdfs.protocolPB.RouterClientNamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.RouterGetUserMappingsProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.RouterNamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.RouterRefreshUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota; import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol; import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol; @@ -328,26 +333,39 @@ public RouterRpcServer(Configuration conf, Router router, RPC.setProtocolEngine(this.conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); - ClientNamenodeProtocolServerSideTranslatorPB - clientProtocolServerTranslator = - new ClientNamenodeProtocolServerSideTranslatorPB(this); + clientProtocolServerTranslator = null; + NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = null; + RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = null; + GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = null; + if (isAsync()) { + clientProtocolServerTranslator = + new RouterClientNamenodeProtocolServerSideTranslatorPB(this); + namenodeProtocolXlator = + new RouterNamenodeProtocolServerSideTranslatorPB(this); + refreshUserMappingXlator = + new RouterRefreshUserMappingsProtocolServerSideTranslatorPB(this); + getUserMappingXlator = + new RouterGetUserMappingsProtocolServerSideTranslatorPB(this); + } else { + clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); + namenodeProtocolXlator = + new NamenodeProtocolServerSideTranslatorPB(this); + refreshUserMappingXlator = + new RefreshUserMappingsProtocolServerSideTranslatorPB(this); + getUserMappingXlator = + new GetUserMappingsProtocolServerSideTranslatorPB(this); + } BlockingService clientNNPbService = ClientNamenodeProtocol .newReflectiveBlockingService(clientProtocolServerTranslator); - NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = - new NamenodeProtocolServerSideTranslatorPB(this); BlockingService nnPbService = NamenodeProtocolService .newReflectiveBlockingService(namenodeProtocolXlator); - RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = - new RefreshUserMappingsProtocolServerSideTranslatorPB(this); BlockingService refreshUserMappingService = RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService. newReflectiveBlockingService(refreshUserMappingXlator); - GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = - new GetUserMappingsProtocolServerSideTranslatorPB(this); BlockingService getUserMappingService = GetUserMappingsProtocolProtos.GetUserMappingsProtocolService. newReflectiveBlockingService(getUserMappingXlator); @@ -428,7 +446,8 @@ public RouterRpcServer(Configuration conf, Router router, // Create the client if (this.enableAsync) { this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router, - this.namenodeResolver, this.rpcMonitor, routerStateIdContext); + this.namenodeResolver, this.rpcMonitor, + routerStateIdContext, asyncRouterHandler); this.clientProto = new RouterAsyncClientProtocol(conf, this); this.nnProto = new RouterAsyncNamenodeProtocol(this); this.routerProto = new RouterAsyncUserProtocol(this); @@ -1336,8 +1355,13 @@ private DatanodeInfo[] getCachedDatanodeReportImpl( try { DatanodeInfo[] dns = clientProto.getDatanodeReport(type); + if (router.getRpcServer().isAsync()) { + dns = syncReturn(DatanodeInfo[].class); + } LOG.debug("Refresh cached DN report with {} datanodes", dns.length); return dns; + } catch (Exception e) { + throw new IOException(e); } finally { // Reset ugi to remote user for remaining operations. RouterRpcServer.resetCurrentUser(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java index ae44f7aaf1d0d..f991b27b029e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java @@ -794,6 +794,12 @@ public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type) return rpcServer.getDatanodeReportAsync(type, true, 0); } + @Override + public DatanodeInfo[] getSlowDatanodeReport() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + return rpcServer.getSlowDatanodeReportAsync(true, 0); + } + @Override public DatanodeStorageReport[] getDatanodeStorageReport( HdfsConstants.DatanodeReportType type) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 249b7e1c82a4a..32dfab93152fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -108,16 +108,17 @@ public class RouterAsyncRpcClient extends RouterRpcClient{ * @param resolver A NN resolver to determine the currently active NN in HA. * @param monitor Optional performance monitor. * @param routerStateIdContext the router state context object to hold the state ids for all + * @param asyncRouterHandler async router handler * namespaces. */ - public RouterAsyncRpcClient( - Configuration conf, Router router, ActiveNamenodeResolver resolver, - RouterRpcMonitor monitor, RouterStateIdContext routerStateIdContext) { + public RouterAsyncRpcClient(Configuration conf, + Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor, + RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) { super(conf, router, resolver, monitor, routerStateIdContext); this.router = router; this.namenodeResolver = resolver; this.rpcMonitor = monitor; - this.asyncRouterHandler = router.getRpcServer().getAsyncRouterHandler(); + this.asyncRouterHandler = asyncRouterHandler; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java index 79ae88f6bcb0d..7884c229aab78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java @@ -154,6 +154,11 @@ public static void asyncCompleteWith(CompletableFuture completableFuture) CUR_COMPLETABLE_FUTURE.set((CompletableFuture) completableFuture); } + public static CompletableFuture getAsyncUtilCompletableFuture() { + assert CUR_COMPLETABLE_FUTURE.get() != null; + return CUR_COMPLETABLE_FUTURE.get(); + } + /** * Completes the current asynchronous operation with an exception. * This method sets the result of the current thread's {@link CompletableFuture} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 7edb549233062..39b132ce31539 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -163,12 +163,12 @@ public class TestRouterRpc { private static final Logger LOG = LoggerFactory.getLogger(TestRouterRpc.class); - private static final int NUM_SUBCLUSTERS = 2; + protected static final int NUM_SUBCLUSTERS = 2; // We need at least 6 DNs to test Erasure Coding with RS-6-3-64k - private static final int NUM_DNS = 6; + protected static final int NUM_DNS = 6; - private static final Comparator EC_POLICY_CMP = + protected static final Comparator EC_POLICY_CMP = new Comparator() { public int compare( ErasureCodingPolicyInfo ec0, @@ -214,6 +214,18 @@ public int compare( @BeforeClass public static void globalSetUp() throws Exception { + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + setUp(routerConf); + } + + public static void setUp(Configuration routerConf) throws Exception { Configuration namenodeConf = new Configuration(); namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY, true); @@ -242,14 +254,6 @@ public static void globalSetUp() throws Exception { // Start NNs and DNs and wait until ready cluster.startCluster(); - // Start routers with only an RPC service - Configuration routerConf = new RouterConfigBuilder() - .metrics() - .rpc() - .build(); - // We decrease the DN cache times to make the test faster - routerConf.setTimeDuration( - RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); cluster.addRouterOverrides(routerConf); cluster.startRouters(); @@ -1260,8 +1264,8 @@ public void testProxyConcatFile() throws Exception { createFile(routerFS, targetFile, existingFileSize); // Concat in same namespaces, succeeds testConcat(srcEmptyFile, targetFile, true, true, - "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): concat: source file " - + srcEmptyFile + " is invalid or empty or underConstruction"); + "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): " + + "concat: source file " + srcEmptyFile + " is invalid or empty or underConstruction"); } @Test @@ -2023,7 +2027,7 @@ public void testCacheAdmin() throws Exception { } @Test - public void testgetGroupsForUser() throws IOException { + public void testgetGroupsForUser() throws Exception { String[] group = new String[] {"bar", "group2"}; UserGroupInformation.createUserForTesting("user", new String[] {"bar", "group2"}); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java index 51a3a1b9c288c..f7dc29d37d501 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java @@ -112,7 +112,8 @@ public void setUp() throws IOException { RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); + routerRpcServer.getRouterStateIdContext(), + routerRpcServer.getAsyncRouterHandler()); routerAsyncRpcServer = Mockito.spy(routerRpcServer); Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient); Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java index 86ba2b2aed89f..46faf238b83d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java @@ -126,7 +126,8 @@ public void setUp() throws IOException { RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); + routerRpcServer.getRouterStateIdContext(), + routerRpcServer.getAsyncRouterHandler()); RouterRpcServer spy = Mockito.spy(routerRpcServer); Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); asyncErasureCoding = new AsyncErasureCoding(spy); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java index ecbf916aaff69..f7e4c4c3f35ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java @@ -120,7 +120,8 @@ public void setUp() throws IOException { RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); + routerRpcServer.getRouterStateIdContext(), + routerRpcServer.getAsyncRouterHandler()); RouterRpcServer spy = Mockito.spy(routerRpcServer); Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); asyncQuota = new AsyncQuota(router.getRouter(), spy); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java new file mode 100644 index 0000000000000..09fe7c9db9378 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertArrayEquals; + +/** + * Testing the asynchronous RPC functionality of the router. + */ +public class TestRouterAsyncRpc extends TestRouterRpc { + private static MiniRouterDFSCluster cluster; + private MiniRouterDFSCluster.RouterContext rndRouter; + + @BeforeClass + public static void globalSetUp() throws Exception { + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + // use async router. + routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true); + setUp(routerConf); + } + + @Before + public void testSetup() throws Exception { + super.testSetup(); + cluster = super.getCluster(); + // Random router for this test + rndRouter = cluster.getRandomRouter(); + } + + @Test + @Override + public void testgetGroupsForUser() throws Exception { + String[] group = new String[] {"bar", "group2"}; + UserGroupInformation.createUserForTesting("user", + new String[] {"bar", "group2"}); + rndRouter.getRouter().getRpcServer().getGroupsForUser("user"); + String[] result = syncReturn(String[].class); + assertArrayEquals(group, result); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java index e0ce4746cda12..0d6a23407b329 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java @@ -152,7 +152,8 @@ public void setup() throws Exception { asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext()); + routerRpcServer.getRouterStateIdContext(), + routerRpcServer.getAsyncRouterHandler()); // Create a test file FSDataOutputStream fsDataOutputStream = routerFs.create( diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java new file mode 100644 index 0000000000000..0ded95aa06b66 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcMultiDestination; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertArrayEquals; + +/** + * Testing the asynchronous RPC functionality of the router with multiple mounts. + */ +public class TestRouterAsyncRpcMultiDestination extends TestRouterRpcMultiDestination { + + @BeforeClass + public static void globalSetUp() throws Exception { + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + // use async router. + routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true); + setUp(routerConf); + } + + @Test + @Override + public void testgetGroupsForUser() throws Exception { + MiniRouterDFSCluster.RouterContext rndRouter = super.getRouterContext(); + String[] group = new String[] {"bar", "group2"}; + UserGroupInformation.createUserForTesting("user", + new String[] {"bar", "group2"}); + rndRouter.getRouter().getRpcServer().getGroupsForUser("user"); + String[] result = syncReturn(String[].class); + assertArrayEquals(group, result); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 25fcdc3080df0..47629d87a58ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -341,117 +341,109 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements VOID_UNSET_STORAGE_POLICY_RESPONSE = UnsetStoragePolicyResponseProto.newBuilder().build(); - private static final CreateResponseProto VOID_CREATE_RESPONSE = - CreateResponseProto.newBuilder().build(); + protected static final CreateResponseProto VOID_CREATE_RESPONSE = + CreateResponseProto.newBuilder().build(); - private static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE = - SetPermissionResponseProto.newBuilder().build(); + protected static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE = + SetPermissionResponseProto.newBuilder().build(); - private static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE = - SetOwnerResponseProto.newBuilder().build(); + protected static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE = + SetOwnerResponseProto.newBuilder().build(); - private static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE = - AbandonBlockResponseProto.newBuilder().build(); + protected static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE = + AbandonBlockResponseProto.newBuilder().build(); - private static final ReportBadBlocksResponseProto VOID_REP_BAD_BLOCK_RESPONSE = - ReportBadBlocksResponseProto.newBuilder().build(); + protected static final ReportBadBlocksResponseProto VOID_REP_BAD_BLOCK_RESPONSE = + ReportBadBlocksResponseProto.newBuilder().build(); - private static final ConcatResponseProto VOID_CONCAT_RESPONSE = - ConcatResponseProto.newBuilder().build(); + protected static final ConcatResponseProto VOID_CONCAT_RESPONSE = + ConcatResponseProto.newBuilder().build(); - private static final Rename2ResponseProto VOID_RENAME2_RESPONSE = - Rename2ResponseProto.newBuilder().build(); + protected static final Rename2ResponseProto VOID_RENAME2_RESPONSE = + Rename2ResponseProto.newBuilder().build(); - private static final GetListingResponseProto VOID_GETLISTING_RESPONSE = - GetListingResponseProto.newBuilder().build(); + protected static final GetListingResponseProto VOID_GETLISTING_RESPONSE = + GetListingResponseProto.newBuilder().build(); - private static final GetBatchedListingResponseProto + protected static final GetBatchedListingResponseProto VOID_GETBATCHEDLISTING_RESPONSE = GetBatchedListingResponseProto.newBuilder() .setStartAfter(ByteString.copyFromUtf8("")) .setHasMore(false) .build(); - private static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = - RenewLeaseResponseProto.newBuilder().build(); + protected static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = + RenewLeaseResponseProto.newBuilder().build(); - private static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = - RefreshNodesResponseProto.newBuilder().build(); + protected static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = + RefreshNodesResponseProto.newBuilder().build(); - private static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE = - FinalizeUpgradeResponseProto.newBuilder().build(); + protected static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE = + FinalizeUpgradeResponseProto.newBuilder().build(); - private static final MetaSaveResponseProto VOID_METASAVE_RESPONSE = - MetaSaveResponseProto.newBuilder().build(); + protected static final MetaSaveResponseProto VOID_METASAVE_RESPONSE = + MetaSaveResponseProto.newBuilder().build(); - private static final GetFileInfoResponseProto VOID_GETFILEINFO_RESPONSE = - GetFileInfoResponseProto.newBuilder().build(); + protected static final GetFileInfoResponseProto VOID_GETFILEINFO_RESPONSE = + GetFileInfoResponseProto.newBuilder().build(); - private static final GetLocatedFileInfoResponseProto + protected static final GetLocatedFileInfoResponseProto VOID_GETLOCATEDFILEINFO_RESPONSE = GetLocatedFileInfoResponseProto.newBuilder().build(); - private static final GetFileLinkInfoResponseProto VOID_GETFILELINKINFO_RESPONSE = - GetFileLinkInfoResponseProto.newBuilder().build(); + protected static final GetFileLinkInfoResponseProto VOID_GETFILELINKINFO_RESPONSE = + GetFileLinkInfoResponseProto.newBuilder().build(); - private static final SetQuotaResponseProto VOID_SETQUOTA_RESPONSE = - SetQuotaResponseProto.newBuilder().build(); + protected static final SetQuotaResponseProto VOID_SETQUOTA_RESPONSE = + SetQuotaResponseProto.newBuilder().build(); - private static final FsyncResponseProto VOID_FSYNC_RESPONSE = - FsyncResponseProto.newBuilder().build(); + protected static final FsyncResponseProto VOID_FSYNC_RESPONSE = + FsyncResponseProto.newBuilder().build(); - private static final SetTimesResponseProto VOID_SETTIMES_RESPONSE = - SetTimesResponseProto.newBuilder().build(); + protected static final SetTimesResponseProto VOID_SETTIMES_RESPONSE = + SetTimesResponseProto.newBuilder().build(); - private static final CreateSymlinkResponseProto VOID_CREATESYMLINK_RESPONSE = - CreateSymlinkResponseProto.newBuilder().build(); + protected static final CreateSymlinkResponseProto VOID_CREATESYMLINK_RESPONSE = + CreateSymlinkResponseProto.newBuilder().build(); - private static final UpdatePipelineResponseProto - VOID_UPDATEPIPELINE_RESPONSE = - UpdatePipelineResponseProto.newBuilder().build(); + protected static final UpdatePipelineResponseProto VOID_UPDATEPIPELINE_RESPONSE = + UpdatePipelineResponseProto.newBuilder().build(); - private static final CancelDelegationTokenResponseProto - VOID_CANCELDELEGATIONTOKEN_RESPONSE = - CancelDelegationTokenResponseProto.newBuilder().build(); + protected static final CancelDelegationTokenResponseProto VOID_CANCELDELEGATIONTOKEN_RESPONSE = + CancelDelegationTokenResponseProto.newBuilder().build(); - private static final SetBalancerBandwidthResponseProto - VOID_SETBALANCERBANDWIDTH_RESPONSE = - SetBalancerBandwidthResponseProto.newBuilder().build(); + protected static final SetBalancerBandwidthResponseProto VOID_SETBALANCERBANDWIDTH_RESPONSE = + SetBalancerBandwidthResponseProto.newBuilder().build(); - private static final SetAclResponseProto - VOID_SETACL_RESPONSE = SetAclResponseProto.getDefaultInstance(); + protected static final SetAclResponseProto VOID_SETACL_RESPONSE = + SetAclResponseProto.getDefaultInstance(); - private static final ModifyAclEntriesResponseProto - VOID_MODIFYACLENTRIES_RESPONSE = ModifyAclEntriesResponseProto - .getDefaultInstance(); + protected static final ModifyAclEntriesResponseProto VOID_MODIFYACLENTRIES_RESPONSE = + ModifyAclEntriesResponseProto.getDefaultInstance(); - private static final RemoveAclEntriesResponseProto - VOID_REMOVEACLENTRIES_RESPONSE = RemoveAclEntriesResponseProto - .getDefaultInstance(); + protected static final RemoveAclEntriesResponseProto VOID_REMOVEACLENTRIES_RESPONSE = + RemoveAclEntriesResponseProto.getDefaultInstance(); - private static final RemoveDefaultAclResponseProto - VOID_REMOVEDEFAULTACL_RESPONSE = RemoveDefaultAclResponseProto - .getDefaultInstance(); + protected static final RemoveDefaultAclResponseProto VOID_REMOVEDEFAULTACL_RESPONSE = + RemoveDefaultAclResponseProto.getDefaultInstance(); - private static final RemoveAclResponseProto - VOID_REMOVEACL_RESPONSE = RemoveAclResponseProto.getDefaultInstance(); - - private static final SetXAttrResponseProto - VOID_SETXATTR_RESPONSE = SetXAttrResponseProto.getDefaultInstance(); - - private static final RemoveXAttrResponseProto - VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance(); + protected static final RemoveAclResponseProto VOID_REMOVEACL_RESPONSE = + RemoveAclResponseProto.getDefaultInstance(); - private static final CheckAccessResponseProto - VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance(); + protected static final SetXAttrResponseProto VOID_SETXATTR_RESPONSE = + SetXAttrResponseProto.getDefaultInstance(); - private static final SatisfyStoragePolicyResponseProto + protected static final RemoveXAttrResponseProto VOID_REMOVEXATTR_RESPONSE = + RemoveXAttrResponseProto.getDefaultInstance(); + + protected static final CheckAccessResponseProto VOID_CHECKACCESS_RESPONSE = + CheckAccessResponseProto.getDefaultInstance(); + + protected static final SatisfyStoragePolicyResponseProto VOID_SATISFYSTORAGEPOLICY_RESPONSE = SatisfyStoragePolicyResponseProto .getDefaultInstance(); - /** - * Constructor - * + /** Constructor. * @param server - the NN server * @throws IOException */ @@ -730,7 +722,7 @@ public TruncateResponseProto truncate(RpcController controller, @Override public DeleteResponseProto delete(RpcController controller, - DeleteRequestProto req) throws ServiceException { + DeleteRequestProto req) throws ServiceException { try { boolean result = server.delete(req.getSrc(), req.getRecursive()); return DeleteResponseProto.newBuilder().setResult(result).build(); @@ -1474,7 +1466,7 @@ public ListCacheDirectivesResponseProto listCacheDirectives( CacheDirectiveInfo filter = PBHelperClient.convert(request.getFilter()); BatchedEntries entries = - server.listCacheDirectives(request.getPrevId(), filter); + server.listCacheDirectives(request.getPrevId(), filter); ListCacheDirectivesResponseProto.Builder builder = ListCacheDirectivesResponseProto.newBuilder(); builder.setHasMore(entries.hasMore()); @@ -1525,9 +1517,9 @@ public ListCachePoolsResponseProto listCachePools(RpcController controller, ListCachePoolsRequestProto request) throws ServiceException { try { BatchedEntries entries = - server.listCachePools(request.getPrevPoolName()); + server.listCachePools(request.getPrevPoolName()); ListCachePoolsResponseProto.Builder responseBuilder = - ListCachePoolsResponseProto.newBuilder(); + ListCachePoolsResponseProto.newBuilder(); responseBuilder.setHasMore(entries.hasMore()); for (int i=0, n=entries.size(); i entries = server .listEncryptionZones(req.getId()); @@ -1755,7 +1747,7 @@ public GetXAttrsResponseProto getXAttrs(RpcController controller, @Override public ListXAttrsResponseProto listXAttrs(RpcController controller, - ListXAttrsRequestProto req) throws ServiceException { + ListXAttrsRequestProto req) throws ServiceException { try { return PBHelperClient.convertListXAttrsResponse(server.listXAttrs(req.getSrc())); } catch (IOException e) { @@ -1776,7 +1768,7 @@ public RemoveXAttrResponseProto removeXAttr(RpcController controller, @Override public CheckAccessResponseProto checkAccess(RpcController controller, - CheckAccessRequestProto req) throws ServiceException { + CheckAccessRequestProto req) throws ServiceException { try { server.checkAccess(req.getPath(), PBHelperClient.convert(req.getMode())); } catch (IOException e) { @@ -1869,8 +1861,8 @@ public GetErasureCodingPoliciesResponseProto getErasureCodingPolicies(RpcControl GetErasureCodingPoliciesRequestProto request) throws ServiceException { try { ErasureCodingPolicyInfo[] ecpInfos = server.getErasureCodingPolicies(); - GetErasureCodingPoliciesResponseProto.Builder resBuilder = GetErasureCodingPoliciesResponseProto - .newBuilder(); + GetErasureCodingPoliciesResponseProto.Builder resBuilder = + GetErasureCodingPoliciesResponseProto.newBuilder(); for (ErasureCodingPolicyInfo info : ecpInfos) { resBuilder.addEcPolicies( PBHelperClient.convertErasureCodingPolicy(info)); @@ -1965,7 +1957,8 @@ public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(RpcController GetErasureCodingPolicyRequestProto request) throws ServiceException { try { ErasureCodingPolicy ecPolicy = server.getErasureCodingPolicy(request.getSrc()); - GetErasureCodingPolicyResponseProto.Builder builder = GetErasureCodingPolicyResponseProto.newBuilder(); + GetErasureCodingPolicyResponseProto.Builder builder = + GetErasureCodingPolicyResponseProto.newBuilder(); if (ecPolicy != null) { builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index da2440dfa722a..251064654cc37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -73,11 +73,11 @@ public class NamenodeProtocolServerSideTranslatorPB implements NamenodeProtocolPB { private final NamenodeProtocol impl; - private final static ErrorReportResponseProto VOID_ERROR_REPORT_RESPONSE = - ErrorReportResponseProto.newBuilder().build(); + protected final static ErrorReportResponseProto VOID_ERROR_REPORT_RESPONSE = + ErrorReportResponseProto.newBuilder().build(); - private final static EndCheckpointResponseProto VOID_END_CHECKPOINT_RESPONSE = - EndCheckpointResponseProto.newBuilder().build(); + protected final static EndCheckpointResponseProto VOID_END_CHECKPOINT_RESPONSE = + EndCheckpointResponseProto.newBuilder().build(); public NamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) { this.impl = impl;