From f70ee794729ead347869ec25f2ca8cd3a1ea0af5 Mon Sep 17 00:00:00 2001 From: XinSun Date: Thu, 15 Oct 2020 01:08:54 +0800 Subject: [PATCH] HBASE-25117 ReplicationSourceShipper thread can not be finished (#2521) Signed-off-by: Wellington Chevreuil Signed-off-by: stack Signed-off-by: Guanghao Zhang Signed-off-by: Duo Zhang (cherry picked from commit 78b7244091f294d7e2f59a563d34dac7cf722cd7) --- .../regionserver/HBaseInterClusterReplicationEndpoint.java | 5 +++-- .../hbase/replication/regionserver/ReplicationSource.java | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 1c1e053fc16b..9876ec41ced1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -221,7 +221,7 @@ private void connectToPeers() { * @param sleepMultiplier by how many times the default sleeping time is augmented * @return True if sleepMultiplier is < maxRetriesMultiplier */ - protected boolean sleepForRetries(String msg, int sleepMultiplier) { + private boolean sleepForRetries(String msg, int sleepMultiplier) { try { if (LOG.isTraceEnabled()) { LOG.trace("{} {}, sleeping {} times {}", @@ -229,8 +229,9 @@ protected boolean sleepForRetries(String msg, int sleepMultiplier) { } Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (LOG.isDebugEnabled()) { - LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); + LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId()); } } return sleepMultiplier < maxRetriesMultiplier; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 47269495d0ca..a7ba442cb376 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -582,6 +582,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool worker.entryReader.setReaderRunning(false); } + if (this.replicationEndpoint != null) { + this.replicationEndpoint.stop(); + } for (ReplicationSourceShipper worker : workers) { if (worker.isAlive() || worker.entryReader.isAlive()) { try { @@ -602,9 +605,6 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool } } - if (this.replicationEndpoint != null) { - this.replicationEndpoint.stop(); - } if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries);