diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2b1ea5075cfd..bea4e474fab2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -52,7 +52,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicReference;import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import javax.management.MalformedObjectNameException; import javax.servlet.http.HttpServlet; @@ -349,7 +349,10 @@ public class HRegionServer extends HasThread implements protected final int numRegionsToReport; // Stub to do region server status calls against the master. - private volatile RegionServerStatusService.BlockingInterface rssStub; + private final Object rssStubLock = new Object(); + private RegionServerStatusService.BlockingInterface rssStub; + private boolean resetRssStub; + private volatile LockService.BlockingInterface lockStub; // RPC client. Used to make the stub above that does region server status checking. RpcClient rpcClient; @@ -1140,9 +1143,10 @@ public void run() { shutdownWAL(!abortRequested); } - // Make sure the proxy is down. - if (this.rssStub != null) { + // Make sure the proxy is down + synchronized (rssStubLock) { this.rssStub = null; + this.resetRssStub = false; } if (this.lockStub != null) { this.lockStub = null; @@ -1212,7 +1216,7 @@ private long getWriteRequestCount() { @VisibleForTesting protected void tryRegionServerReport(long reportStartTime, long reportEndTime) throws IOException { - RegionServerStatusService.BlockingInterface rss = rssStub; + RegionServerStatusService.BlockingInterface rss = ensureRssStub(); if (rss == null) { // the current server could be stopping. return; @@ -1229,12 +1233,10 @@ protected void tryRegionServerReport(long reportStartTime, long reportEndTime) // This will be caught and handled as a fatal error in run() throw ioe; } - if (rssStub == rss) { - rssStub = null; - } + resetRssStubOnServiceException(rss); // Couldn't connect to the master, get location from zk and reconnect // Method blocks until new master is found or we are stopped - createRegionServerStatusStub(true); + ensureRssStub(); } } @@ -1245,7 +1247,7 @@ protected void tryRegionServerReport(long reportStartTime, long reportEndTime) * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise */ public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { - RegionServerStatusService.BlockingInterface rss = rssStub; + RegionServerStatusService.BlockingInterface rss = ensureRssStub(); if (rss == null) { // the current server could be stopping. LOG.trace("Skipping Region size report to HMaster as stub is null"); @@ -1261,10 +1263,8 @@ public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { // The Master is coming up. Will retry the report later. Avoid re-creating the stub. return true; } - if (rssStub == rss) { - rssStub = null; - } - createRegionServerStatusStub(true); + resetRssStubOnServiceException(rss); + ensureRssStub(); if (ioe instanceof DoNotRetryIOException) { DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; if (doNotRetryEx.getCause() != null) { @@ -2308,12 +2308,11 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co // Only go down if clusterConnection is null. It is set to null almost as last thing as the // HRegionServer does down. while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { - RegionServerStatusService.BlockingInterface rss = rssStub; + RegionServerStatusService.BlockingInterface rss = ensureRssStub(Integer.MAX_VALUE); + if (rss == null) { + break; // That means cluster connection is closed. + } try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } ReportRegionStateTransitionResponse response = rss.reportRegionStateTransition(null, request); if (response.hasErrorMessage()) { @@ -2345,9 +2344,7 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co ioe); if (pause) Threads.sleep(pauseTime); tries++; - if (rssStub == rss) { - rssStub = null; - } + resetRssStubOnServiceException(rss); } } return false; @@ -2425,7 +2422,7 @@ public void abort(String reason, Throwable cause) { msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); } // Report to the master but only if we have already registered with the master. - RegionServerStatusService.BlockingInterface rss = rssStub; + RegionServerStatusService.BlockingInterface rss = ensureRssStub(); if (rss != null && this.serverName != null) { ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); @@ -2563,9 +2560,9 @@ public ReplicationSinkService getReplicationSinkService() { * @return master + port, or null if server has been stopped */ @VisibleForTesting - protected synchronized ServerName createRegionServerStatusStub() { + protected ServerName createRegionServerStatusStub() { // Create RS stub without refreshing the master node from ZK, use cached data - return createRegionServerStatusStub(false); + return createRegionServerStatusStub(true); } /** @@ -2663,9 +2660,11 @@ private boolean keepLooping() { */ private RegionServerStartupResponse reportForDuty() throws IOException { if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); - ServerName masterServerName = createRegionServerStatusStub(true); - RegionServerStatusService.BlockingInterface rss = rssStub; - if (masterServerName == null || rss == null) return null; + RegionServerStatusService.BlockingInterface rss = ensureRssStub(); + ServerName masterServerName = rss == null ? null : masterAddressTracker.getMasterAddress(); + if (masterServerName == null) { + return null; + } RegionServerStartupResponse result = null; try { rpcServices.requestCount.reset(); @@ -2697,7 +2696,7 @@ private RegionServerStartupResponse reportForDuty() throws IOException { } else { LOG.warn("error telling master we are up", se); } - rssStub = null; + resetRssStubOnServiceException(rss); } return result; } @@ -2707,16 +2706,11 @@ public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { try { GetLastFlushedSequenceIdRequest req = RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); - RegionServerStatusService.BlockingInterface rss = rssStub; - if (rss == null) { // Try to connect one more time - createRegionServerStatusStub(); - rss = rssStub; - if (rss == null) { - // Still no luck, we tried - LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); - return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) - .build(); - } + RegionServerStatusService.BlockingInterface rss = ensureRssStub(1); + if (rss == null) { + LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); + return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) + .build(); } GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); return RegionStoreSequenceIds.newBuilder() @@ -3813,12 +3807,7 @@ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { @Override public boolean reportFileArchivalForQuotas(TableName tableName, Collection> archivedFiles) { - RegionServerStatusService.BlockingInterface rss = rssStub; - if (rss == null || rsSpaceQuotaManager == null) { - // the current server could be stopping. - LOG.trace("Skipping file archival reporting to HMaster as stub is null"); - return false; - } + RegionServerStatusService.BlockingInterface rss = ensureRssStub(); try { RegionServerStatusProtos.FileArchiveNotificationRequest request = rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); @@ -3833,11 +3822,9 @@ public boolean reportFileArchivalForQuotas(TableName tableName, // The Master is coming up. Will retry the report later. Avoid re-creating the stub. return false; } - if (rssStub == rss) { - rssStub = null; - } + resetRssStubOnServiceException(rss); // re-create the stub if we failed to report the archival - createRegionServerStatusStub(true); + ensureRssStub(); LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); return false; } @@ -3864,21 +3851,43 @@ public void remoteProcedureComplete(long procId, Throwable error) { } void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { - RegionServerStatusService.BlockingInterface rss = rssStub; - for (;;) { - rss = rssStub; - if (rss != null) { - break; - } - createRegionServerStatusStub(); + RegionServerStatusService.BlockingInterface rss = ensureRssStub(Integer.MAX_VALUE); + if (rss == null) { + throw new IOException("Cannot connect to master to report; cluster shutting down?"); } try { rss.reportProcedureDone(null, request); } catch (ServiceException se) { + resetRssStubOnServiceException(rss); + throw ProtobufUtil.getRemoteException(se); + } + } + + private RegionServerStatusService.BlockingInterface ensureRssStub() { + return ensureRssStub(0); + } + + private RegionServerStatusService.BlockingInterface ensureRssStub(int maxRetries) { + RegionServerStatusService.BlockingInterface rss = null; + while (rss == null && maxRetries-- >= 0 + && (clusterConnection != null && !clusterConnection.isClosed())) { + synchronized (rssStubLock) { + if (resetRssStub) { + createRegionServerStatusStub(); + resetRssStub = (rssStub != null); // Ensure we try again in some other call. + } + rss = rssStub; + } + } + return rss; + } + + private void resetRssStubOnServiceException(RegionServerStatusService.BlockingInterface rss) { + synchronized (rssStubLock) { if (rssStub == rss) { rssStub = null; + resetRssStub = true; } - throw ProtobufUtil.getRemoteException(se); } }