Skip to content

Commit

Permalink
HBASE-25117 ReplicationSourceShipper thread can not be finished (#2521)
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Chevreuil <[email protected]>
Signed-off-by: stack <[email protected]>
Signed-off-by: Guanghao Zhang <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>

(cherry picked from commit 78b7244)
  • Loading branch information
ddupg authored and wchevreuil committed Oct 14, 2020
1 parent 219aa12 commit f70ee79
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,17 @@ private void connectToPeers() {
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
private boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}",
logPeerId(), msg, sleepForRetries, sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (LOG.isDebugEnabled()) {
LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId());
}
}
return sleepMultiplier < maxRetriesMultiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
worker.entryReader.setReaderRunning(false);
}

if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
Expand All @@ -602,9 +605,6 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
}
}

if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
Expand Down

0 comments on commit f70ee79

Please sign in to comment.