From 05970c3cbbb3bb64abd716e208f2a60289d63b64 Mon Sep 17 00:00:00 2001 From: windwheel Date: Mon, 5 Sep 2022 11:02:24 +0800 Subject: [PATCH 1/2] [flaky-test] fix Address already in use' --- .../AutoSwitchRoleIntegrationTest.java | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java index a3ffe2675c2..dfa1d849ac1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java +++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java @@ -18,6 +18,13 @@ package org.apache.rocketmq.test.autoswitchrole; import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.controller.ReplicasManager; import org.apache.rocketmq.common.UtilAll; @@ -35,9 +42,11 @@ import org.apache.rocketmq.store.ha.HAConnectionState; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.test.base.BaseConf; import org.junit.After; import org.junit.Test; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -52,7 +61,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase { private String controllerAddress; private BrokerController brokerController1; private BrokerController brokerController2; - + protected List brokerControllerList; + + public void init(int mappedFileSize) throws Exception { super.initialize(); @@ -78,7 +89,9 @@ public void init(int mappedFileSize) throws Exception { this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize); this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize); - + this.brokerControllerList = ImmutableList.of(brokerController1, brokerController2); + + // Wait slave connecting to master assertTrue(waitSlaveReady(this.brokerController2.getMessageStore())); } @@ -110,18 +123,24 @@ public boolean waitSlaveReady(MessageStore messageStore) throws InterruptedExcep @Test public void testCheckSyncStateSet() throws Exception { init(defaultFileSize); + awaitDispatchMs(6); mockData(); // Check sync state set final ReplicasManager replicasManager = brokerController1.getReplicasManager(); SyncStateSet syncStateSet = replicasManager.getSyncStateSet(); assertEquals(2, syncStateSet.getSyncStateSet().size()); - + + // Shutdown controller2 - this.brokerController2.shutdown(); - - Thread.sleep(5000); + ScheduledExecutorService singleThread = Executors.newSingleThreadScheduledExecutor(); + while (!singleThread.awaitTermination(6* 1000, TimeUnit.MILLISECONDS)) { + this.brokerController2.shutdown(); + singleThread.shutdown(); + } + syncStateSet = replicasManager.getSyncStateSet(); + shutdown(); assertEquals(1, syncStateSet.getSyncStateSet().size()); } @@ -154,6 +173,7 @@ public void testChangeMaster() throws Exception { // Check slave message checkMessage(brokerController1.getMessageStore(), 20, 0); + shutdown(); } @Test @@ -170,6 +190,7 @@ public void testAddBroker() throws Exception { putMessage(this.brokerController1.getMessageStore()); Thread.sleep(3000); checkMessage(broker3.getMessageStore(), 20, 0); + shutdown(); } @Test @@ -222,9 +243,9 @@ public void testTruncateEpochLogAndChangeMaster() throws Exception { waitSlaveReady(broker4.getMessageStore()); Thread.sleep(6000); checkMessage(broker4.getMessageStore(), 10, 10); + shutdown(); } - - @After + public void shutdown() throws InterruptedException { for (BrokerController controller : this.brokerList) { controller.shutdown(); @@ -236,4 +257,24 @@ public void shutdown() throws InterruptedException { } super.destroy(); } + + public boolean awaitDispatchMs(long timeMs) throws Exception { + await().atMost(Duration.ofSeconds(timeMs)).until( + () -> { + boolean allOk = true; + for (BrokerController brokerController: brokerControllerList) { + if (brokerController.getMessageStore() == null) { + allOk = false; + break; + } + } + if (allOk) { + return true; + } + return false; + } + ); + return false; + } + } From 77b4471b9b86e594bf1c06007734799bf2626dff Mon Sep 17 00:00:00 2001 From: windwheel Date: Mon, 5 Sep 2022 11:43:37 +0800 Subject: [PATCH 2/2] Re-apply for a port when the port is occupied --- .../autoswitchrole/AutoSwitchRoleBase.java | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java index e6757909ec2..98e3d77320f 100644 --- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java @@ -18,8 +18,10 @@ package org.apache.rocketmq.test.autoswitchrole; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; @@ -59,7 +61,9 @@ public class AutoSwitchRoleBase { protected List brokerList; private SocketAddress BornHost; private SocketAddress StoreHost; - + private static Integer No= 0; + + protected void initialize() { this.brokerList = new ArrayList<>(); try { @@ -68,9 +72,32 @@ protected void initialize() { } catch (Exception ignored) { } } - - public int nextPort() { - return PORT_COUNTER.addAndGet(10 + random.nextInt(10)); + + public static Integer nextPort() throws IOException { + return nextPort(1001,9999); + } + + public static Integer nextPort(Integer minPort, Integer maxPort) throws IOException { + Random random = new Random(); + int tempPort; + int port; + try{ + while (true){ + tempPort = random.nextInt(maxPort)%(maxPort-minPort+1) + minPort; + ServerSocket serverSocket = new ServerSocket(tempPort); + port = serverSocket.getLocalPort(); + serverSocket.close(); + break; + } + }catch (Exception ignored){ + if (No>200){ + throw new IOException("This server's open ports are temporarily full!"); + } + No++; + port = nextPort(minPort,maxPort); + } + No = 0; + return port; } public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,