From 563a67aa51eb72f042f617c0d21c2de184adca75 Mon Sep 17 00:00:00 2001 From: huiruan Date: Fri, 4 Nov 2022 14:00:11 +0800 Subject: [PATCH 1/2] Reset sizeOfLogQueue when refresh replication source --- .../hbase/replication/regionserver/MetricsSource.java | 8 ++++++-- .../regionserver/ReplicationSourceManager.java | 3 +++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 14a753791da3..de1023ebd3b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -158,8 +158,12 @@ public void incrSizeOfLogQueue() { } public void decrSizeOfLogQueue() { - singleSourceSource.decrSizeOfLogQueue(1); - globalSourceSource.decrSizeOfLogQueue(1); + decrSizeOfLogQueue(1); + } + + public void decrSizeOfLogQueue(int size) { + singleSourceSource.decrSizeOfLogQueue(size); + globalSourceSource.decrSizeOfLogQueue(size); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2973db521bd1..7a87a9af0f47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -467,6 +467,9 @@ public void refreshSources(String peerId) throws IOException { ReplicationSourceInterface toRemove = this.sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); + // Reset sizeOfLogQueue, log will re enqueue to the created new source. + toRemove.getSourceMetrics() + .decrSizeOfLogQueue(toRemove.getSourceMetrics().getSizeOfLogQueue()); // Do not clear metrics toRemove.terminate(terminateMessage, null, false); } From dc2948fe7bb3a7c150ec1441a5577ea7de32d4cc Mon Sep 17 00:00:00 2001 From: huiruan Date: Wed, 9 Nov 2022 21:18:34 +0800 Subject: [PATCH 2/2] Reset sizeOfLogQueue when refresh replication source --- .../replication/regionserver/MetricsSource.java | 8 ++------ .../regionserver/ReplicationSourceManager.java | 12 +++++------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index de1023ebd3b7..14a753791da3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -158,12 +158,8 @@ public void incrSizeOfLogQueue() { } public void decrSizeOfLogQueue() { - decrSizeOfLogQueue(1); - } - - public void decrSizeOfLogQueue(int size) { - singleSourceSource.decrSizeOfLogQueue(size); - globalSourceSource.decrSizeOfLogQueue(size); + singleSourceSource.decrSizeOfLogQueue(1); + globalSourceSource.decrSizeOfLogQueue(1); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 7a87a9af0f47..74816074c783 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -461,18 +461,16 @@ public void refreshSources(String peerId) throws IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); - ReplicationSourceInterface src = createSource(peerId, peer); + ReplicationSourceInterface src; // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { - ReplicationSourceInterface toRemove = this.sources.put(peerId, src); + ReplicationSourceInterface toRemove = this.sources.remove(peerId); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - // Reset sizeOfLogQueue, log will re enqueue to the created new source. - toRemove.getSourceMetrics() - .decrSizeOfLogQueue(toRemove.getSourceMetrics().getSizeOfLogQueue()); - // Do not clear metrics - toRemove.terminate(terminateMessage, null, false); + toRemove.terminate(terminateMessage, null, true); } + src = createSource(peerId, peer); + this.sources.put(peerId, src); for (NavigableSet walsByGroup : walsById.get(peerId).values()) { walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); }