Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22432 HRegionServer rssStub handling is incorrect and inconsistent #241

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ private long getWriteRequestCount() {
@VisibleForTesting
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub(false, 0);
if (rss == null) {
// the current server could be stopping.
return;
Expand All @@ -1229,9 +1229,7 @@ protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
// This will be caught and handled as a fatal error in run()
throw ioe;
}
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
// Couldn't connect to the master, get location from zk and reconnect
// Method blocks until new master is found or we are stopped
createRegionServerStatusStub(true);
Expand All @@ -1245,7 +1243,7 @@ protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
* @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
*/
public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub(false, 0);
if (rss == null) {
// the current server could be stopping.
LOG.trace("Skipping Region size report to HMaster as stub is null");
Expand All @@ -1261,9 +1259,7 @@ public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
// The Master is coming up. Will retry the report later. Avoid re-creating the stub.
return true;
}
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
createRegionServerStatusStub(true);
if (ioe instanceof DoNotRetryIOException) {
DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
Expand Down Expand Up @@ -2308,12 +2304,11 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co
// Only go down if clusterConnection is null. It is set to null almost as last thing as the
// HRegionServer does down.
while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub(true, Integer.MAX_VALUE);
if (rss == null) {
break; // That means cluster connection is closed.
}
try {
if (rss == null) {
createRegionServerStatusStub();
continue;
}
ReportRegionStateTransitionResponse response =
rss.reportRegionStateTransition(null, request);
if (response.hasErrorMessage()) {
Expand Down Expand Up @@ -2345,9 +2340,7 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co
ioe);
if (pause) Threads.sleep(pauseTime);
tries++;
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
}
}
return false;
Expand Down Expand Up @@ -2664,8 +2657,10 @@ private boolean keepLooping() {
private RegionServerStartupResponse reportForDuty() throws IOException {
if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
ServerName masterServerName = createRegionServerStatusStub(true);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (masterServerName == null || rss == null) return null;
RegionServerStatusService.BlockingInterface rss = ensureRssStub(false, 0);
if (masterServerName == null || rss == null) {
return null;
}
RegionServerStartupResponse result = null;
try {
rpcServices.requestCount.reset();
Expand Down Expand Up @@ -2697,7 +2692,7 @@ private RegionServerStartupResponse reportForDuty() throws IOException {
} else {
LOG.warn("error telling master we are up", se);
}
rssStub = null;
resetRssStubOnServiceException(rss);
}
return result;
}
Expand All @@ -2707,16 +2702,12 @@ public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
try {
GetLastFlushedSequenceIdRequest req =
RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (rss == null) { // Try to connect one more time
createRegionServerStatusStub();
rss = rssStub;
if (rss == null) {
// Still no luck, we tried
LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
.build();
}
RegionServerStatusService.BlockingInterface rss = ensureRssStub(false, 1);
if (rss == null) {
// Still no luck, we tried
LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
.build();
}
GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
return RegionStoreSequenceIds.newBuilder()
Expand Down Expand Up @@ -3813,12 +3804,7 @@ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
@Override
public boolean reportFileArchivalForQuotas(TableName tableName,
Collection<Entry<String, Long>> archivedFiles) {
RegionServerStatusService.BlockingInterface rss = rssStub;
if (rss == null || rsSpaceQuotaManager == null) {
// the current server could be stopping.
LOG.trace("Skipping file archival reporting to HMaster as stub is null");
return false;
}
RegionServerStatusService.BlockingInterface rss = ensureRssStub(false, 0);
try {
RegionServerStatusProtos.FileArchiveNotificationRequest request =
rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
Expand All @@ -3833,9 +3819,7 @@ public boolean reportFileArchivalForQuotas(TableName tableName,
// The Master is coming up. Will retry the report later. Avoid re-creating the stub.
return false;
}
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
// re-create the stub if we failed to report the archival
createRegionServerStatusStub(true);
LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
Expand Down Expand Up @@ -3864,21 +3848,36 @@ public void remoteProcedureComplete(long procId, Throwable error) {
}

void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
RegionServerStatusService.BlockingInterface rss = rssStub;
for (;;) {
RegionServerStatusService.BlockingInterface rss = ensureRssStub();
try {
rss.reportProcedureDone(null, request);
} catch (ServiceException se) {
resetRssStubOnServiceException(rss);
throw ProtobufUtil.getRemoteException(se);
}
}

private RegionServerStatusService.BlockingInterface ensureRssStub() {
return ensureRssStub(false, Integer.MAX_VALUE);
}

private RegionServerStatusService.BlockingInterface ensureRssStub(
sershe-ms marked this conversation as resolved.
Show resolved Hide resolved
boolean checkClusterConn, int maxRetries) {
RegionServerStatusService.BlockingInterface rss = null;
while (maxRetries-- >= 0 && (!checkClusterConn
|| (checkClusterConn && clusterConnection != null && !clusterConnection.isClosed()))) {
rss = rssStub;
if (rss != null) {
break;
}
createRegionServerStatusStub();
}
try {
rss.reportProcedureDone(null, request);
} catch (ServiceException se) {
if (rssStub == rss) {
rssStub = null;
}
throw ProtobufUtil.getRemoteException(se);
return rss;
}

private void resetRssStubOnServiceException(RegionServerStatusService.BlockingInterface rss) {
if (rssStub == rss) {
rssStub = null;
}
}

Expand Down