From f5bfccd3c7a7f4a760612bfbc6d89b75a90d7577 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Tue, 26 Jul 2022 09:22:10 -0400 Subject: [PATCH 01/11] LIS-4916 Refactor and add round-robin publishing strategy Also, bumping JDK version to 8 and nsq version in docker-compose.yml to 1.2.1 --- docker-compose.yml | 6 +- pom.xml | 4 +- .../com/sproutsocial/nsq/BalanceStrategy.java | 25 ++++ .../sproutsocial/nsq/ConnectionDetails.java | 98 +++++++++++++++ .../nsq/FailoverBalenceStrategy.java | 29 +++++ .../nsq/ListBasedBallenceStrategy.java | 80 ++++++++++++ .../java/com/sproutsocial/nsq/Publisher.java | 114 +++--------------- .../nsq/RoundRobbinBallenceStrategy.java | 43 +++++++ .../nsq/SingleNsqdBallenceStrategy.java | 71 +++++++++++ 9 files changed, 370 insertions(+), 100 deletions(-) create mode 100644 src/main/java/com/sproutsocial/nsq/BalanceStrategy.java create mode 100644 src/main/java/com/sproutsocial/nsq/ConnectionDetails.java create mode 100644 src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java create mode 100644 src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java create mode 100644 src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java create mode 100644 src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java diff --git a/docker-compose.yml b/docker-compose.yml index fc16a3f..a4f4227 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,13 +2,13 @@ version: "3" services: test_nsqlookupd: - image: nsqio/nsq:v0.3.8 + image: nsqio/nsq:v1.2.1 command: /nsqlookupd ports: - "127.0.0.1:4160:4160" - "127.0.0.1:4161:4161" test_nsqd: - image: nsqio/nsq:v0.3.8 + image: nsqio/nsq:v1.2.1 links: - test_nsqlookupd:test_nsqlookupd command: /nsqd --lookupd-tcp-address=test_nsqlookupd:4160 --broadcast-address=test_nsqd --tcp-address=test_nsqd:4150 @@ -16,7 +16,7 @@ services: - "127.0.0.1:4150:4150" - "127.0.0.1:4151:4151" test_nsqadmin: - image: nsqio/nsq:v0.3.8 + image: nsqio/nsq:v1.2.1 links: - test_nsqlookupd:test_nsqlookupd - test_nsqd:test_nsqd diff --git a/pom.xml b/pom.xml index fb91e0f..735386a 100644 --- a/pom.xml +++ b/pom.xml @@ -40,8 +40,8 @@ maven-compiler-plugin 3.10.0 - 1.6 - 1.6 + 1.8 + 1.8 diff --git a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java new file mode 100644 index 0000000..a2384fb --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java @@ -0,0 +1,25 @@ +package com.sproutsocial.nsq; + +import java.io.IOException; + +public interface BalanceStrategy { + static BalanceStrategy build(String nsqd, String failoverNsqd, Publisher parent, Client client) { + if (nsqd.contains(",")) { + return new RoundRobbinBallenceStrategy(client, parent, nsqd, failoverNsqd); + } else if (failoverNsqd == null) { + return new SingleNsqdBallenceStrategy(client, parent, nsqd); + } else { + return new FailoverBalenceStrategy(client, nsqd, failoverNsqd, parent); + } + } + + PubConnection getConnection(); + + void lastPublishFailed(); + + void connectionClosed(PubConnection closedCon); + + int getFailoverDurationSecs(); + + void setFailoverDurationSecs(int failoverDurationSecs); +} diff --git a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java new file mode 100644 index 0000000..c0b6156 --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java @@ -0,0 +1,98 @@ +package com.sproutsocial.nsq; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.slf4j.LoggerFactory.getLogger; + +class ConnectionDetails { + private static final Logger LOGGER = getLogger(ConnectionDetails.class); + private final Publisher parent; + private final BasePubSub basePubSub; + HostAndPort hostAndPort; + PubConnection con = null; + boolean isFailover = false; + long failoverStart = 0; + private volatile int failoverDurationSecs; + + public ConnectionDetails(String hostAndPort, Publisher parent, int failoverDurationSecs, BasePubSub basePubSub) { + this.hostAndPort = HostAndPort.fromString(hostAndPort).withDefaultPort(4150); + this.parent = parent; + this.failoverDurationSecs = failoverDurationSecs; + this.basePubSub = basePubSub; + } + + /** + * @return true if this host is ready to receive data + */ + protected synchronized boolean makeReady() { + if (con == null) { + if (parent.isStopping) { + throw new NSQException("publisher stopped"); + } + return connectAttempt(); + } else if (isFailover && canAttemptRecovery()) { + isFailover = false; + connectAttempt(); + return true; + } + return !(isFailover || con == null); + } + + private boolean canAttemptRecovery() { + return Util.clock() - failoverStart > TimeUnit.SECONDS.toMillis(failoverDurationSecs); + } + + private boolean connectAttempt() { + if (con != null) { + con.close(); + } + con = new PubConnection(basePubSub.getClient(), this.hostAndPort, parent); + try { + con.connect(basePubSub.getConfig()); + } catch (IOException e) { + markFailure(); + return false; + } + LOGGER.info("publisher connected:{}", hostAndPort); + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectionDetails that = (ConnectionDetails) o; + return hostAndPort.equals(that.hostAndPort); + } + + @Override + public int hashCode() { + return hostAndPort.hashCode(); + } + + protected synchronized void markFailure() { + Util.closeQuietly(con); + con = null; + isFailover = true; + failoverStart = Util.clock(); + LOGGER.warn("Failed to connect to {}, will retry later", hostAndPort); + } + + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + } + + @Override + public String toString() { + return "ConnectionDetails{" + + "hostAndPort=" + hostAndPort + + ", con=" + con + + ", isFailover=" + isFailover + + ", failoverStart=" + failoverStart + + ", failoverDurationSecs=" + failoverDurationSecs + + '}'; + } +} diff --git a/src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java b/src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java new file mode 100644 index 0000000..d6362ed --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java @@ -0,0 +1,29 @@ +package com.sproutsocial.nsq; + +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; + +import java.util.Arrays; + +import static org.slf4j.LoggerFactory.getLogger; + +@ThreadSafe +public class FailoverBalenceStrategy extends ListBasedBallenceStrategy implements BalanceStrategy { + + public FailoverBalenceStrategy(Client client, String nsqd, String failoverNsqd, Publisher parent) { + super(client, parent, Arrays.asList(nsqd, failoverNsqd)); + } + + @Override + public ConnectionDetails getCurrentConnectionDetails() { + for (int attempts = 0; attempts < daemonList.size(); attempts++) { + ConnectionDetails candidate = daemonList.get(attempts); + if (candidate.makeReady()) { + return candidate; + } + } + throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); + } + + +} diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java new file mode 100644 index 0000000..713ec57 --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java @@ -0,0 +1,80 @@ +package com.sproutsocial.nsq; + +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.slf4j.LoggerFactory.getLogger; + +@ThreadSafe +public abstract class ListBasedBallenceStrategy extends BasePubSub implements BalanceStrategy { + private static final Logger logger = getLogger(ListBasedBallenceStrategy.class); + protected final List daemonList; + private Publisher parent; + private int failoverDurationSecs = 300; + private ConnectionDetails lastReturnedConnectionDetails; + + public ListBasedBallenceStrategy(Client client, Publisher parent, List nsqd) { + super(client); + this.parent = parent; + List connectionDetails = new ArrayList<>(); + for (String host : nsqd) { + if (host != null) + connectionDetails.add(new ConnectionDetails(host, + this.parent, + this.failoverDurationSecs, + this)); + } + daemonList = Collections.unmodifiableList(connectionDetails); + } + + @Override + public PubConnection getConnection() { + lastReturnedConnectionDetails = getCurrentConnectionDetails(); + return lastReturnedConnectionDetails.con; + } + + @Override + public void lastPublishFailed() { + lastReturnedConnectionDetails.markFailure(); + } + + @Override + public synchronized void connectionClosed(PubConnection closedCon) { + for (ConnectionDetails daemon : daemonList) { + if (daemon.con == closedCon) { + daemon.con = null; + logger.debug("removed closed publisher connection:{}", closedCon.getHost()); + } + } + } + + @Override + public int getFailoverDurationSecs() { + return this.failoverDurationSecs; + } + + @Override + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + for (ConnectionDetails connectionDetails : daemonList) { + connectionDetails.setFailoverDurationSecs(failoverDurationSecs); + } + } + + public abstract ConnectionDetails getCurrentConnectionDetails(); + + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + + "daemonList=" + daemonList + + ", failoverDurationSecs=" + failoverDurationSecs + + ", lastReturnedConnectionDetails=" + lastReturnedConnectionDetails + + '}'; + } +} diff --git a/src/main/java/com/sproutsocial/nsq/Publisher.java b/src/main/java/com/sproutsocial/nsq/Publisher.java index e175a0b..46081ec 100644 --- a/src/main/java/com/sproutsocial/nsq/Publisher.java +++ b/src/main/java/com/sproutsocial/nsq/Publisher.java @@ -1,6 +1,5 @@ package com.sproutsocial.nsq; -import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,26 +17,17 @@ @ThreadSafe public class Publisher extends BasePubSub { - - private final HostAndPort nsqd; - private final HostAndPort failoverNsqd; - private PubConnection con; - private boolean isFailover = false; - private long failoverStart; - private int failoverDurationSecs = 300; - private final Map batchers = new HashMap(); - private ScheduledExecutorService batchExecutor; - private static final int DEFAULT_MAX_BATCH_SIZE = 16 * 1024; private static final int DEFUALT_MAX_BATCH_DELAY = 300; - private static final Logger logger = LoggerFactory.getLogger(Publisher.class); + private final BalanceStrategy balanceStrategy; + private final Map batchers = new HashMap<>(); + private ScheduledExecutorService batchExecutor; public Publisher(Client client, String nsqd, String failoverNsqd) { super(client); - this.nsqd = HostAndPort.fromString(nsqd).withDefaultPort(4150); - this.failoverNsqd = failoverNsqd != null ? HostAndPort.fromString(failoverNsqd).withDefaultPort(4150) : null; client.addPublisher(this); + balanceStrategy = BalanceStrategy.build(nsqd, failoverNsqd, this, client); } public Publisher(String nsqd, String failoverNsqd) { @@ -48,56 +38,22 @@ public Publisher(String nsqd) { this(nsqd, null); } - @GuardedBy("this") - private void checkConnection() throws IOException { - if (con == null) { - if (isStopping) { - throw new NSQException("publisher stopped"); - } - connect(nsqd); - } - else if (isFailover && Util.clock() - failoverStart > failoverDurationSecs * 1000) { - isFailover = false; - connect(nsqd); - logger.info("using primary nsqd"); - } - } - - @GuardedBy("this") - private void connect(HostAndPort host) throws IOException { - if (con != null) { - con.close(); - } - con = new PubConnection(client, host, this); - try { - con.connect(config); - } - catch(IOException e) { - con.close(); - con = null; - throw e; - } - logger.info("publisher connected:{}", host); - } public synchronized void connectionClosed(PubConnection closedCon) { - if (con == closedCon) { - con = null; - logger.debug("removed closed publisher connection:{}", closedCon.getHost()); - } + balanceStrategy.connectionClosed(closedCon); } public synchronized void publish(String topic, byte[] data) { checkNotNull(topic); checkNotNull(data); checkArgument(data.length > 0); + PubConnection connection = balanceStrategy.getConnection(); try { - checkConnection(); - con.publish(topic, data); - } - catch (Exception e) { - logger.error("publish error with:{}", isFailover ? failoverNsqd : nsqd, e); - publishFailover(topic, data); + connection.publish(topic, data); + } catch (Exception e) { + balanceStrategy.lastPublishFailed(); + logger.error("publish error with", e); + publish(topic, data); } } @@ -108,10 +64,8 @@ public synchronized void publishDeferred(String topic, byte[] data, long delay, checkArgument(delay > 0); checkNotNull(unit); try { - checkConnection(); - con.publishDeferred(topic, data, unit.toMillis(delay)); - } - catch (Exception e) { + balanceStrategy.getConnection().publishDeferred(topic, data, unit.toMillis(delay)); + } catch (Exception e) { //deferred publish never fails over throw new NSQException("deferred publish failed", e); } @@ -122,38 +76,10 @@ public synchronized void publish(String topic, List dataList) { checkNotNull(dataList); checkArgument(dataList.size() > 0); try { - checkConnection(); - con.publish(topic, dataList); - } - catch (Exception e) { - logger.error("publish error with:{}", isFailover ? failoverNsqd : nsqd, e); - for (byte[] data : dataList) { - publishFailover(topic, data); - } - } - } - - @GuardedBy("this") - private void publishFailover(String topic, byte[] data) { - try { - if (failoverNsqd == null) { - logger.warn("publish failed but no failoverNsqd configured. Will wait and retry once."); - Util.sleepQuietly(10000); //could do exponential backoff or make configurable - connect(nsqd); - } - else if (!isFailover) { - failoverStart = Util.clock(); - isFailover = true; - connect(failoverNsqd); - logger.info("using failover nsqd:{}", failoverNsqd); - } - con.publish(topic, data); - } - catch (Exception e) { - Util.closeQuietly(con); - con = null; - isFailover = false; - throw new NSQException("publish failed", e); + balanceStrategy.getConnection().publish(topic, dataList); + } catch (Exception e) { + logger.error("publish error", e); + publish(topic, dataList); } } @@ -191,8 +117,6 @@ public synchronized void stop() { batcher.sendBatch(); } super.stop(); - Util.closeQuietly(con); - con = null; if (batchExecutor != null) { Util.shutdownAndAwaitTermination(batchExecutor, 40, TimeUnit.MILLISECONDS); } @@ -202,11 +126,11 @@ public synchronized void stop() { } public synchronized int getFailoverDurationSecs() { - return failoverDurationSecs; + return balanceStrategy.getFailoverDurationSecs(); } public synchronized void setFailoverDurationSecs(int failoverDurationSecs) { - this.failoverDurationSecs = failoverDurationSecs; + balanceStrategy.setFailoverDurationSecs(failoverDurationSecs); } } diff --git a/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java new file mode 100644 index 0000000..3bfc7e1 --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java @@ -0,0 +1,43 @@ +package com.sproutsocial.nsq; + +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.slf4j.LoggerFactory.getLogger; + +@ThreadSafe +public class RoundRobbinBallenceStrategy extends ListBasedBallenceStrategy implements BalanceStrategy { + private int nextDaemonIndex = 0; + + public RoundRobbinBallenceStrategy(Client client, Publisher parent, String nsqd, String failoverNsqd) { + super(client, parent, getHostNames(nsqd, failoverNsqd)); + } + + private static List getHostNames(String nsqd, String failoverNsqd) { + List out = new ArrayList<>(); + out.addAll(Arrays.asList(nsqd.split(","))); + out.addAll(Arrays.asList(failoverNsqd.split(","))); + return out; + } + + @Override + public ConnectionDetails getCurrentConnectionDetails() { + for (int attempts = 0; attempts < daemonList.size(); attempts++) { + ConnectionDetails candidate = daemonList.get(nextDaemonIndex); + if (candidate.makeReady()) { + return candidate; + } + nextDaemonIndex++; + if (nextDaemonIndex >= daemonList.size()) { + nextDaemonIndex = 0; + } + } + throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); + } + + +} diff --git a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java new file mode 100644 index 0000000..450992b --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java @@ -0,0 +1,71 @@ +package com.sproutsocial.nsq; + +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; + +import java.io.IOException; + +import static org.slf4j.LoggerFactory.getLogger; + +@ThreadSafe +public class SingleNsqdBallenceStrategy extends BasePubSub implements BalanceStrategy { + private static final Logger logger = getLogger(SingleNsqdBallenceStrategy.class); + protected final ConnectionDetails daemon; + private final Publisher parent; + private int failoverDurationSecs = 10; + + public SingleNsqdBallenceStrategy(Client client, Publisher parent, String nsqd) { + super(client); + this.parent = parent; + daemon = new ConnectionDetails(nsqd, + this.parent, + this.failoverDurationSecs, + this); + } + + @Override + public PubConnection getConnection() { + if (!daemon.makeReady()) { + logger.warn("We aren't able to connect just now, so we are going to sleep for {} seconds", failoverDurationSecs); + Util.sleepQuietly(failoverDurationSecs * 1000); + } + if (daemon.makeReady()) + return daemon.con; + else { + throw new NSQException("Unable to connect"); + } + } + + @Override + public void lastPublishFailed() { + daemon.markFailure(); + } + + @Override + public synchronized void connectionClosed(PubConnection closedCon) { + if (daemon.con == closedCon) { + daemon.con = null; + logger.debug("removed closed publisher connection:{}", closedCon.getHost()); + } + } + + @Override + public int getFailoverDurationSecs() { + return failoverDurationSecs; + } + + @Override + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + this.daemon.setFailoverDurationSecs(failoverDurationSecs); + } + + + @Override + public String toString() { + return "SingleNsqdBallenceStrategy{" + + "daemon=" + daemon + + ", failoverDurationSecs=" + failoverDurationSecs + + '}'; + } +} From 9faf3582a21192fb2779fb5977f62d991b1a793f Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Tue, 26 Jul 2022 15:18:31 -0400 Subject: [PATCH 02/11] LIS-4916 Refactor and add round-robin publishing strategy Also, bumping JDK version to 8 and nsq version in docker-compose.yml to 1.2.1 --- README.md | 24 +++++++ .../com/sproutsocial/nsq/BalanceStrategy.java | 5 +- .../nsq/RoundRobbinBallenceStrategy.java | 10 ++- .../com/sproutsocial/nsq/RoundTripIT.java | 66 +++++++++++++++++++ 4 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 src/test/java/com/sproutsocial/nsq/RoundTripIT.java diff --git a/README.md b/README.md index 096dc97..73f9733 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,30 @@ after nsqd responds `OK` You can batch messages manually and publish them all at once with `publish(String topic, List messages)` +### Single NSQ-d host publishing +When we have a single NSQ-d host specified (failoverNsqd is null or not specified, and there are no commas in the nsdq string) +we will reattempt a failed publish after 10 seconds by default. This happens in line with synchronous publishes or when +publishing buffered and the batch size is reached. + +If this second attempt fails, the publish call will throw an exception. +### Failover publishing +nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter, and there are no commas, this mode is activated. + +In failover mode, nsq-j prefers publishing to a primary. If that fails, nsq-j will attempt to establish a connection to +the failoverNsqd host and publish there. It will attempt to fail back to the primary after the failure timeout, +defaulting to 30 seconds. + +If publishing fails to write to the failoverNsqd, the write will throw an exception. + + +### Round-robin publishing +If you specify a comma separated list as the nsqd parameter, round-robin publishing is used. If a failoverNsqd is provided, +it will also be treated as a comma seperated list and included in the rotation. + +All the hosts that are included in both nsqd and failoverNsqd will be added to a rotation. Each publish action is sent +to the next host in the rotation. If a publishing fails, the host is marked "dead" for 30 seconds (default) before +it will be added back to the rotation. If all hosts are marked dead, an exception will be thrown out of publish. + ## Subscribe ```java public class PubExample { diff --git a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java index a2384fb..d2704e3 100644 --- a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java @@ -1,9 +1,12 @@ package com.sproutsocial.nsq; -import java.io.IOException; +import static com.sproutsocial.nsq.Util.checkNotNull; public interface BalanceStrategy { static BalanceStrategy build(String nsqd, String failoverNsqd, Publisher parent, Client client) { + checkNotNull(nsqd); + checkNotNull(parent); + checkNotNull(client); if (nsqd.contains(",")) { return new RoundRobbinBallenceStrategy(client, parent, nsqd, failoverNsqd); } else if (failoverNsqd == null) { diff --git a/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java index 3bfc7e1..33e45d9 100644 --- a/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java @@ -1,14 +1,11 @@ package com.sproutsocial.nsq; import net.jcip.annotations.ThreadSafe; -import org.slf4j.Logger; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static org.slf4j.LoggerFactory.getLogger; - @ThreadSafe public class RoundRobbinBallenceStrategy extends ListBasedBallenceStrategy implements BalanceStrategy { private int nextDaemonIndex = 0; @@ -18,9 +15,10 @@ public RoundRobbinBallenceStrategy(Client client, Publisher parent, String nsqd, } private static List getHostNames(String nsqd, String failoverNsqd) { - List out = new ArrayList<>(); - out.addAll(Arrays.asList(nsqd.split(","))); - out.addAll(Arrays.asList(failoverNsqd.split(","))); + List out = new ArrayList<>(Arrays.asList(nsqd.split(","))); + if (failoverNsqd != null) { + out.addAll(Arrays.asList(failoverNsqd.split(","))); + } return out; } diff --git a/src/test/java/com/sproutsocial/nsq/RoundTripIT.java b/src/test/java/com/sproutsocial/nsq/RoundTripIT.java new file mode 100644 index 0000000..3623dc4 --- /dev/null +++ b/src/test/java/com/sproutsocial/nsq/RoundTripIT.java @@ -0,0 +1,66 @@ +package com.sproutsocial.nsq; + +import org.junit.Test; + +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + + +public class RoundTripIT extends SubscribeBase { + + public static final String PUBTEST = "pubtest"; + public static final int MAX_MSG_LEN = 20; + public static final int MESSAGES_TO_SEND = 50; + + + @Test + public void testPub() { + System.out.println("testPub. random seed:" + seed); + testPub("localhost,test_nsqd", null); + } + + @Test + public void testSinglePub() { + System.out.println("testSinglePub. random seed:" + seed); + testPub("localhost", null); + } + + @Test + public void testFailoverPub() { + System.out.println("testFailoverPub. random seed:" + seed); + testPub("localhost", "test_nsqd"); + } + + private void testPub(String nsqd, String failoverNsqd) { + + Subscriber subscriber = new Subscriber(10, "127.0.0.1"); + subscriber.subscribe(PUBTEST, "tail" + UUID.randomUUID() + "#ephemeral", handler); + + Util.sleepQuietly(1000); + + Publisher publisher = new Publisher(nsqd, failoverNsqd); + publisher.setConfig(new Config()); + List msgs = messages(MESSAGES_TO_SEND, RoundTripIT.MAX_MSG_LEN); + send(publisher, PUBTEST, msgs, 0.1f, 100); + + Util.sleepQuietly(1000); + + received.sort(String::compareTo); + assertEquals(msgs, received); + } + + private static void send(Publisher publisher, String topic, List msgs, float delayChance, int maxDelay) { + int count = 0; + for (String msg : msgs) { + if (random.nextFloat() < delayChance) { + Util.sleepQuietly(random.nextInt(maxDelay)); + } + publisher.publish(topic, msg.getBytes()); + if (++count % 10 == 0) { + System.out.println("sent " + count + " msgs"); + } + } + } +} From cd1dcc4617c1c9facfd38c7341be15bd30dc2ba8 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Thu, 8 Sep 2022 15:45:27 -0400 Subject: [PATCH 03/11] Doing some serious refactoring - Prefer composition over inheritance for strategies - Cleanup use of connection details - State machine for connection details - Change how construction of the strategies work. --- .../com/sproutsocial/nsq/BalanceStrategy.java | 18 +--- .../sproutsocial/nsq/ConnectionDetails.java | 72 +++++++++---- .../nsq/FailoverBalenceStrategy.java | 29 ----- .../nsq/ListBasedBalanceStrategy.java | 102 ++++++++++++++++++ .../nsq/ListBasedBallenceStrategy.java | 80 -------------- .../java/com/sproutsocial/nsq/Publisher.java | 29 +++-- .../nsq/RoundRobbinBallenceStrategy.java | 41 ------- .../nsq/SingleNsqdBallenceStrategy.java | 36 +++---- 8 files changed, 187 insertions(+), 220 deletions(-) delete mode 100644 src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java create mode 100644 src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java delete mode 100644 src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java delete mode 100644 src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java diff --git a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java index d2704e3..8688625 100644 --- a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java @@ -1,24 +1,8 @@ package com.sproutsocial.nsq; -import static com.sproutsocial.nsq.Util.checkNotNull; - public interface BalanceStrategy { - static BalanceStrategy build(String nsqd, String failoverNsqd, Publisher parent, Client client) { - checkNotNull(nsqd); - checkNotNull(parent); - checkNotNull(client); - if (nsqd.contains(",")) { - return new RoundRobbinBallenceStrategy(client, parent, nsqd, failoverNsqd); - } else if (failoverNsqd == null) { - return new SingleNsqdBallenceStrategy(client, parent, nsqd); - } else { - return new FailoverBalenceStrategy(client, nsqd, failoverNsqd, parent); - } - } - - PubConnection getConnection(); - void lastPublishFailed(); + ConnectionDetails getConnectionDetails(); void connectionClosed(PubConnection closedCon); diff --git a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java index c0b6156..8cff185 100644 --- a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java +++ b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java @@ -6,18 +6,28 @@ import java.util.concurrent.TimeUnit; import static org.slf4j.LoggerFactory.getLogger; +import static com.sproutsocial.nsq.Util.*; class ConnectionDetails { + private enum State { + CONNECTED, + NOT_CONNECTED, + FAILED + } + private static final Logger LOGGER = getLogger(ConnectionDetails.class); private final Publisher parent; private final BasePubSub basePubSub; HostAndPort hostAndPort; - PubConnection con = null; - boolean isFailover = false; + private PubConnection con = null; long failoverStart = 0; private volatile int failoverDurationSecs; + private State currentState = State.NOT_CONNECTED; public ConnectionDetails(String hostAndPort, Publisher parent, int failoverDurationSecs, BasePubSub basePubSub) { + checkNotNull(hostAndPort); + checkNotNull(parent); + checkNotNull(basePubSub); this.hostAndPort = HostAndPort.fromString(hostAndPort).withDefaultPort(4150); this.parent = parent; this.failoverDurationSecs = failoverDurationSecs; @@ -28,17 +38,15 @@ public ConnectionDetails(String hostAndPort, Publisher parent, int failoverDurat * @return true if this host is ready to receive data */ protected synchronized boolean makeReady() { - if (con == null) { + if (currentState == State.NOT_CONNECTED) { if (parent.isStopping) { throw new NSQException("publisher stopped"); } return connectAttempt(); - } else if (isFailover && canAttemptRecovery()) { - isFailover = false; - connectAttempt(); - return true; + } else if (currentState == State.FAILED && canAttemptRecovery()) { + return connectAttempt(); } - return !(isFailover || con == null); + return currentState == State.CONNECTED; } private boolean canAttemptRecovery() { @@ -46,12 +54,13 @@ private boolean canAttemptRecovery() { } private boolean connectAttempt() { - if (con != null) { - con.close(); + if (getCon() != null) { + getCon().close(); } - con = new PubConnection(basePubSub.getClient(), this.hostAndPort, parent); + setCon(new PubConnection(basePubSub.getClient(), this.hostAndPort, parent)); try { - con.connect(basePubSub.getConfig()); + getCon().connect(basePubSub.getConfig()); + currentState = State.CONNECTED; } catch (IOException e) { markFailure(); return false; @@ -73,10 +82,10 @@ public int hashCode() { return hostAndPort.hashCode(); } - protected synchronized void markFailure() { - Util.closeQuietly(con); - con = null; - isFailover = true; + public synchronized void markFailure() { + Util.closeQuietly(getCon()); + setCon(null); + currentState = State.FAILED; failoverStart = Util.clock(); LOGGER.warn("Failed to connect to {}, will retry later", hostAndPort); } @@ -85,14 +94,31 @@ public void setFailoverDurationSecs(int failoverDurationSecs) { this.failoverDurationSecs = failoverDurationSecs; } + + public PubConnection getCon() { + return con; + } + + private void setCon(PubConnection con) { + this.con = con; + } + + public void clearConnection() { + this.con = null; + currentState = State.NOT_CONNECTED; + } + @Override public String toString() { - return "ConnectionDetails{" + - "hostAndPort=" + hostAndPort + - ", con=" + con + - ", isFailover=" + isFailover + - ", failoverStart=" + failoverStart + - ", failoverDurationSecs=" + failoverDurationSecs + - '}'; + final StringBuilder sb = new StringBuilder("ConnectionDetails{"); + sb.append("parent=").append(parent); + sb.append(", basePubSub=").append(basePubSub); + sb.append(", hostAndPort=").append(hostAndPort); + sb.append(", con=").append(con); + sb.append(", failoverStart=").append(failoverStart); + sb.append(", failoverDurationSecs=").append(failoverDurationSecs); + sb.append(", currentState=").append(currentState); + sb.append('}'); + return sb.toString(); } } diff --git a/src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java b/src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java deleted file mode 100644 index d6362ed..0000000 --- a/src/main/java/com/sproutsocial/nsq/FailoverBalenceStrategy.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.sproutsocial.nsq; - -import net.jcip.annotations.ThreadSafe; -import org.slf4j.Logger; - -import java.util.Arrays; - -import static org.slf4j.LoggerFactory.getLogger; - -@ThreadSafe -public class FailoverBalenceStrategy extends ListBasedBallenceStrategy implements BalanceStrategy { - - public FailoverBalenceStrategy(Client client, String nsqd, String failoverNsqd, Publisher parent) { - super(client, parent, Arrays.asList(nsqd, failoverNsqd)); - } - - @Override - public ConnectionDetails getCurrentConnectionDetails() { - for (int attempts = 0; attempts < daemonList.size(); attempts++) { - ConnectionDetails candidate = daemonList.get(attempts); - if (candidate.makeReady()) { - return candidate; - } - } - throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); - } - - -} diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java new file mode 100644 index 0000000..0aac672 --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java @@ -0,0 +1,102 @@ +package com.sproutsocial.nsq; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import static com.sproutsocial.nsq.Util.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; + +public class ListBasedBalanceStrategy extends BasePubSub implements BalanceStrategy { + private static final Logger logger = getLogger(ListBasedBalanceStrategy.class); + protected final List daemonList; + private Publisher parent; + private Function, ConnectionDetails> connectionDetailsSelector; + private int failoverDurationSecs = 300; + + public static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List nsqd) { + return new ListBasedBalanceStrategy(client, parent, nsqd, new Function, ConnectionDetails>() { + private volatile int nextDaemonIndex = 0; + + @Override + public ConnectionDetails apply(List daemonList) { + for (int attempts = 0; attempts < daemonList.size(); attempts++) { + ConnectionDetails candidate = daemonList.get(nextDaemonIndex); + if (candidate.makeReady()) { + return candidate; + } + nextDaemonIndex++; + if (nextDaemonIndex >= daemonList.size()) { + nextDaemonIndex = 0; + } + } + throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); + } + }); + } + + public static ListBasedBalanceStrategy buildFailoverStrategy(Client client, Publisher parent, List nsqd) { + return new ListBasedBalanceStrategy(client, parent, nsqd, daemonList -> { + for (int attempts = 0; attempts < daemonList.size(); attempts++) { + ConnectionDetails candidate = daemonList.get(attempts); + if (candidate.makeReady()) { + return candidate; + } + } + throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); + }); + } + + public ListBasedBalanceStrategy(Client client, Publisher parent, List nsqd, Function, ConnectionDetails> connectionDetailsSelector) { + super(client); + checkNotNull(parent); + checkNotNull(nsqd); + checkNotNull(connectionDetailsSelector); + + this.parent = parent; + this.connectionDetailsSelector = connectionDetailsSelector; + List connectionDetails = new ArrayList<>(); + for (String host : nsqd) { + if (host != null) + connectionDetails.add(new ConnectionDetails(host, this.parent, this.failoverDurationSecs, this)); + } + daemonList = Collections.unmodifiableList(connectionDetails); + } + + @Override + public ConnectionDetails getConnectionDetails() { + return connectionDetailsSelector.apply(daemonList); + } + + @Override + public synchronized void connectionClosed(PubConnection closedCon) { + for (ConnectionDetails daemon : daemonList) { + if (daemon.getCon() == closedCon) { + daemon.clearConnection(); + logger.debug("removed closed publisher connection:{}", closedCon.getHost()); + } + } + } + + @Override + public int getFailoverDurationSecs() { + return this.failoverDurationSecs; + } + + @Override + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + for (ConnectionDetails connectionDetails : daemonList) { + connectionDetails.setFailoverDurationSecs(failoverDurationSecs); + } + } + + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + "daemonList=" + daemonList + ", failoverDurationSecs=" + failoverDurationSecs + '}'; + } +} diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java deleted file mode 100644 index 713ec57..0000000 --- a/src/main/java/com/sproutsocial/nsq/ListBasedBallenceStrategy.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.sproutsocial.nsq; - -import net.jcip.annotations.ThreadSafe; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.slf4j.LoggerFactory.getLogger; - -@ThreadSafe -public abstract class ListBasedBallenceStrategy extends BasePubSub implements BalanceStrategy { - private static final Logger logger = getLogger(ListBasedBallenceStrategy.class); - protected final List daemonList; - private Publisher parent; - private int failoverDurationSecs = 300; - private ConnectionDetails lastReturnedConnectionDetails; - - public ListBasedBallenceStrategy(Client client, Publisher parent, List nsqd) { - super(client); - this.parent = parent; - List connectionDetails = new ArrayList<>(); - for (String host : nsqd) { - if (host != null) - connectionDetails.add(new ConnectionDetails(host, - this.parent, - this.failoverDurationSecs, - this)); - } - daemonList = Collections.unmodifiableList(connectionDetails); - } - - @Override - public PubConnection getConnection() { - lastReturnedConnectionDetails = getCurrentConnectionDetails(); - return lastReturnedConnectionDetails.con; - } - - @Override - public void lastPublishFailed() { - lastReturnedConnectionDetails.markFailure(); - } - - @Override - public synchronized void connectionClosed(PubConnection closedCon) { - for (ConnectionDetails daemon : daemonList) { - if (daemon.con == closedCon) { - daemon.con = null; - logger.debug("removed closed publisher connection:{}", closedCon.getHost()); - } - } - } - - @Override - public int getFailoverDurationSecs() { - return this.failoverDurationSecs; - } - - @Override - public void setFailoverDurationSecs(int failoverDurationSecs) { - this.failoverDurationSecs = failoverDurationSecs; - for (ConnectionDetails connectionDetails : daemonList) { - connectionDetails.setFailoverDurationSecs(failoverDurationSecs); - } - } - - public abstract ConnectionDetails getCurrentConnectionDetails(); - - - @Override - public String toString() { - return this.getClass().getSimpleName() + "{" + - "daemonList=" + daemonList + - ", failoverDurationSecs=" + failoverDurationSecs + - ", lastReturnedConnectionDetails=" + lastReturnedConnectionDetails + - '}'; - } -} diff --git a/src/main/java/com/sproutsocial/nsq/Publisher.java b/src/main/java/com/sproutsocial/nsq/Publisher.java index 46081ec..cb08e67 100644 --- a/src/main/java/com/sproutsocial/nsq/Publisher.java +++ b/src/main/java/com/sproutsocial/nsq/Publisher.java @@ -4,13 +4,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static com.sproutsocial.nsq.Util.checkArgument; import static com.sproutsocial.nsq.Util.checkNotNull; @@ -25,17 +27,22 @@ public class Publisher extends BasePubSub { private ScheduledExecutorService batchExecutor; public Publisher(Client client, String nsqd, String failoverNsqd) { + this(client, (c, p) -> ListBasedBalanceStrategy.buildFailoverStrategy(c, p, Arrays.asList(nsqd, failoverNsqd))); + } + + public Publisher(Client client, BiFunction balanceStrategyFactory) { super(client); client.addPublisher(this); - balanceStrategy = BalanceStrategy.build(nsqd, failoverNsqd, this, client); + this.balanceStrategy = balanceStrategyFactory.apply(client, this); } + public Publisher(String nsqd, String failoverNsqd) { this(Client.getDefaultClient(), nsqd, failoverNsqd); } public Publisher(String nsqd) { - this(nsqd, null); + this(Client.getDefaultClient(), (c, p) -> new SingleNsqdBallenceStrategy(c, p, nsqd)); } @@ -47,11 +54,11 @@ public synchronized void publish(String topic, byte[] data) { checkNotNull(topic); checkNotNull(data); checkArgument(data.length > 0); - PubConnection connection = balanceStrategy.getConnection(); + ConnectionDetails connectionDetails = balanceStrategy.getConnectionDetails(); try { - connection.publish(topic, data); + connectionDetails.getCon().publish(topic, data); } catch (Exception e) { - balanceStrategy.lastPublishFailed(); + connectionDetails.markFailure(); logger.error("publish error with", e); publish(topic, data); } @@ -63,10 +70,12 @@ public synchronized void publishDeferred(String topic, byte[] data, long delay, checkArgument(data.length > 0); checkArgument(delay > 0); checkNotNull(unit); + ConnectionDetails connection = balanceStrategy.getConnectionDetails(); try { - balanceStrategy.getConnection().publishDeferred(topic, data, unit.toMillis(delay)); + connection.getCon().publishDeferred(topic, data, unit.toMillis(delay)); } catch (Exception e) { - //deferred publish never fails over + connection.markFailure(); + //deferred publish does not retry throw new NSQException("deferred publish failed", e); } } @@ -75,10 +84,12 @@ public synchronized void publish(String topic, List dataList) { checkNotNull(topic); checkNotNull(dataList); checkArgument(dataList.size() > 0); + ConnectionDetails connectionDetails = balanceStrategy.getConnectionDetails(); try { - balanceStrategy.getConnection().publish(topic, dataList); + connectionDetails.getCon().publish(topic, dataList); } catch (Exception e) { logger.error("publish error", e); + connectionDetails.markFailure(); publish(topic, dataList); } } diff --git a/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java deleted file mode 100644 index 33e45d9..0000000 --- a/src/main/java/com/sproutsocial/nsq/RoundRobbinBallenceStrategy.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.sproutsocial.nsq; - -import net.jcip.annotations.ThreadSafe; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -@ThreadSafe -public class RoundRobbinBallenceStrategy extends ListBasedBallenceStrategy implements BalanceStrategy { - private int nextDaemonIndex = 0; - - public RoundRobbinBallenceStrategy(Client client, Publisher parent, String nsqd, String failoverNsqd) { - super(client, parent, getHostNames(nsqd, failoverNsqd)); - } - - private static List getHostNames(String nsqd, String failoverNsqd) { - List out = new ArrayList<>(Arrays.asList(nsqd.split(","))); - if (failoverNsqd != null) { - out.addAll(Arrays.asList(failoverNsqd.split(","))); - } - return out; - } - - @Override - public ConnectionDetails getCurrentConnectionDetails() { - for (int attempts = 0; attempts < daemonList.size(); attempts++) { - ConnectionDetails candidate = daemonList.get(nextDaemonIndex); - if (candidate.makeReady()) { - return candidate; - } - nextDaemonIndex++; - if (nextDaemonIndex >= daemonList.size()) { - nextDaemonIndex = 0; - } - } - throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); - } - - -} diff --git a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java index 450992b..1f86c58 100644 --- a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java @@ -1,50 +1,44 @@ package com.sproutsocial.nsq; -import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; -import java.io.IOException; - import static org.slf4j.LoggerFactory.getLogger; -@ThreadSafe public class SingleNsqdBallenceStrategy extends BasePubSub implements BalanceStrategy { private static final Logger logger = getLogger(SingleNsqdBallenceStrategy.class); - protected final ConnectionDetails daemon; + protected final ConnectionDetails connectionDetails; private final Publisher parent; private int failoverDurationSecs = 10; public SingleNsqdBallenceStrategy(Client client, Publisher parent, String nsqd) { super(client); this.parent = parent; - daemon = new ConnectionDetails(nsqd, + connectionDetails = new ConnectionDetails(nsqd, this.parent, this.failoverDurationSecs, this); } @Override - public PubConnection getConnection() { - if (!daemon.makeReady()) { + public ConnectionDetails getConnectionDetails() { + if (!connectionDetails.makeReady()) { logger.warn("We aren't able to connect just now, so we are going to sleep for {} seconds", failoverDurationSecs); Util.sleepQuietly(failoverDurationSecs * 1000); + if (connectionDetails.makeReady()) + return connectionDetails; + else { + throw new NSQException("Unable to connect"); + } + } else { + return connectionDetails; } - if (daemon.makeReady()) - return daemon.con; - else { - throw new NSQException("Unable to connect"); - } - } - @Override - public void lastPublishFailed() { - daemon.markFailure(); } @Override public synchronized void connectionClosed(PubConnection closedCon) { - if (daemon.con == closedCon) { - daemon.con = null; + if (connectionDetails.getCon() == closedCon) { + connectionDetails.clearConnection(); logger.debug("removed closed publisher connection:{}", closedCon.getHost()); } } @@ -57,14 +51,14 @@ public int getFailoverDurationSecs() { @Override public void setFailoverDurationSecs(int failoverDurationSecs) { this.failoverDurationSecs = failoverDurationSecs; - this.daemon.setFailoverDurationSecs(failoverDurationSecs); + this.connectionDetails.setFailoverDurationSecs(failoverDurationSecs); } @Override public String toString() { return "SingleNsqdBallenceStrategy{" + - "daemon=" + daemon + + "daemon=" + connectionDetails + ", failoverDurationSecs=" + failoverDurationSecs + '}'; } From 691c07d3614fa303902b5a9594dbd66f1b94a4a4 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Mon, 19 Sep 2022 11:28:04 -0400 Subject: [PATCH 04/11] Migrated to new docker tests for round robin publishing --- .../sproutsocial/nsq/ConnectionDetails.java | 1 - .../nsq/ListBasedBalanceStrategy.java | 12 +- .../java/com/sproutsocial/nsq/Publisher.java | 15 ++- .../sproutsocial/nsq/BaseDockerTestIT.java | 12 ++ .../nsq/RoundRobinDockerTestIT.java | 115 ++++++++++++++++++ .../com/sproutsocial/nsq/RoundTripIT.java | 66 ---------- 6 files changed, 149 insertions(+), 72 deletions(-) create mode 100644 src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java delete mode 100644 src/test/java/com/sproutsocial/nsq/RoundTripIT.java diff --git a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java index 8cff185..3286c57 100644 --- a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java +++ b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java @@ -112,7 +112,6 @@ public void clearConnection() { public String toString() { final StringBuilder sb = new StringBuilder("ConnectionDetails{"); sb.append("parent=").append(parent); - sb.append(", basePubSub=").append(basePubSub); sb.append(", hostAndPort=").append(hostAndPort); sb.append(", con=").append(con); sb.append(", failoverStart=").append(failoverStart); diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java index 0aac672..54c8273 100644 --- a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.BiFunction; import java.util.function.Function; import static com.sproutsocial.nsq.Util.checkNotNull; @@ -17,6 +18,10 @@ public class ListBasedBalanceStrategy extends BasePubSub implements BalanceStrat private Function, ConnectionDetails> connectionDetailsSelector; private int failoverDurationSecs = 300; + public static BiFunction getRoundRobinStrategyBuilder(List nsqd) { + return (c, p) -> buildRoundRobinStrategy(c, p, nsqd); + } + public static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List nsqd) { return new ListBasedBalanceStrategy(client, parent, nsqd, new Function, ConnectionDetails>() { private volatile int nextDaemonIndex = 0; @@ -25,13 +30,14 @@ public static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Pu public ConnectionDetails apply(List daemonList) { for (int attempts = 0; attempts < daemonList.size(); attempts++) { ConnectionDetails candidate = daemonList.get(nextDaemonIndex); - if (candidate.makeReady()) { - return candidate; - } + boolean candidateReady = candidate.makeReady(); nextDaemonIndex++; if (nextDaemonIndex >= daemonList.size()) { nextDaemonIndex = 0; } + if (candidateReady) { + return candidate; + } } throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); } diff --git a/src/main/java/com/sproutsocial/nsq/Publisher.java b/src/main/java/com/sproutsocial/nsq/Publisher.java index 1ea69c5..947a0c6 100644 --- a/src/main/java/com/sproutsocial/nsq/Publisher.java +++ b/src/main/java/com/sproutsocial/nsq/Publisher.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -27,7 +28,17 @@ public class Publisher extends BasePubSub { private ScheduledExecutorService batchExecutor; public Publisher(Client client, String nsqd, String failoverNsqd) { - this(client, (c, p) -> ListBasedBalanceStrategy.buildFailoverStrategy(c, p, Arrays.asList(nsqd, failoverNsqd))); + this(client, getBalanceStrategyBiFunction(nsqd, failoverNsqd)); + } + + private static BiFunction getBalanceStrategyBiFunction(String nsqd, String failoverNsqd) { + Objects.requireNonNull(nsqd); + + if (failoverNsqd == null) { + return (c, p) -> new SingleNsqdBallenceStrategy(c, p, nsqd); + } else { + return (c, p) -> ListBasedBalanceStrategy.buildFailoverStrategy(c, p, Arrays.asList(nsqd, failoverNsqd)); + } } public Publisher(Client client, BiFunction balanceStrategyFactory) { @@ -42,7 +53,7 @@ public Publisher(String nsqd, String failoverNsqd) { } public Publisher(String nsqd) { - this(Client.getDefaultClient(), (c, p) -> new SingleNsqdBallenceStrategy(c, p, nsqd)); + this(Client.getDefaultClient(), nsqd, null); } diff --git a/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java index a3c34ab..9f4b652 100644 --- a/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -103,6 +104,10 @@ protected void validateFromParticularNsqd(List receivedMessages, int } } + protected Map> mapByNsqd(List messages) { + return messages.stream().collect(Collectors.groupingBy(e -> e.getConnection().getHost().toString())); + } + protected Publisher primaryOnlyPublisher() { return new Publisher(client, cluster.getNsqdNodes().get(0).getTcpHostAndPort().toString(), null); } @@ -113,6 +118,13 @@ protected Publisher backupPublisher() { return publisher; } + protected Publisher roundRobinPublisher() { + List nsqdHosts = cluster.getNsqdNodes().stream().map(e -> e.getTcpHostAndPort().toString()).collect(Collectors.toList()); + Publisher publisher = new Publisher(client, ListBasedBalanceStrategy.getRoundRobinStrategyBuilder(nsqdHosts)); + publisher.setFailoverDurationSecs(5); + return publisher; + } + public void validateReceivedAllMessages(List expected, List actual, boolean validateOrder) { List actualMessages = actual.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); List expectedCopy = new ArrayList<>(expected); diff --git a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java new file mode 100644 index 0000000..9902950 --- /dev/null +++ b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java @@ -0,0 +1,115 @@ +package com.sproutsocial.nsq; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class RoundRobinDockerTestIT extends BaseDockerTestIT { + + private Subscriber subscriber; + private Publisher publisher; + private TestMessageHandler handler; + + @Override + public void setup() { + super.setup(); + Util.sleepQuietly(500); + handler = new TestMessageHandler(); + subscriber = new Subscriber(client, 1, 5, cluster.getLookupNode().getHttpHostAndPort().toString()); + subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); + publisher = roundRobinPublisher(); + } + + @Override + public void teardown() throws InterruptedException { + subscriber.stop(); + if (publisher != null) { + publisher.stop(); + } + super.teardown(); + } + + + @Test + public void test_happyPath() { + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + } + + private void validateMessagesSentRoundRobin(List nsqdNodes, int count, List messages, List receivedMessages) { + validateMessagesSentRoundRobin(nsqdNodes, count, messages, receivedMessages, 0); + } + + private void validateMessagesSentRoundRobin(List nsqdNodes, int count, List messages, List receivedMessages, int nodeOffset) { + Map> messagesByNsqd = mapByNsqd(receivedMessages); + for (int i = 0; i < count; i++) { + String nsqdHst = nsqdNodes.get((i+ nodeOffset) % nsqdNodes.size()).getTcpHostAndPort().toString(); + int expectedIndex = i / nsqdNodes.size(); + String nsqdMessage = new String(messagesByNsqd.get(nsqdHst).get(expectedIndex).getData()); + assertEquals("For message " + i + " expect it to be from nsqd " + nsqdHst, messages.get(i), nsqdMessage); + } + } + + @Test + public void test_singleNodeFailed() { + List nsqdNodes = cluster.getNsqdNodes(); + cluster.disconnectNetworkFor(nsqdNodes.get(2)); + publishAndValidateRoundRobinForNodes(nsqdNodes.subList(0, 2), 0); + } + + @Test + public void test_SingleNodeFailsAndRecovers(){ + List nsqdNodes = cluster.getNsqdNodes(); + cluster.disconnectNetworkFor(nsqdNodes.get(2)); + publishAndValidateRoundRobinForNodes(nsqdNodes.subList(0, 2), 0); + cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(2)); + Util.sleepQuietly(5000); + publishAndValidateRoundRobinForNodes(nsqdNodes, 2); + } + + private void publishAndValidateRoundRobinForNodes(List nsqdNodes, int nodeOffset) { + int count = 10 * nsqdNodes.size(); + List messages = messages(count, 40); + send(topic, messages, 0.5f, 10, publisher); + List receivedMessages = handler.drainMessagesOrTimeOut(messages.size()); + validateReceivedAllMessages(messages, receivedMessages, false); + validateMessagesSentRoundRobin(nsqdNodes, count, messages, receivedMessages, nodeOffset); + } + + @Test(timeout = 500) + public void test_allNodesDown_throwsException(){ + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { + cluster.disconnectNetworkFor(nsqdNode); + } + int count = 1; + List messages = messages(count, 40); + Assert.assertThrows(NSQException.class,()-> send(topic, messages, 0.5f, 10, publisher)); + } + + @Test() + public void test_allNodesDown_LaterRecovers(){ + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { + cluster.disconnectNetworkFor(nsqdNode); + } + int count = 1; + List messages = messages(count, 40); + Assert.assertThrows(NSQException.class,()-> send(topic, messages, 0.5f, 10, publisher)); + + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { + cluster.reconnectNetworkFor(nsqdNode); + } + + Util.sleepQuietly(5000); + + Assert.assertTrue(handler.drainMessages(1).isEmpty()); + + send(topic, messages, 0.5f, 10, publisher); + List nsqMessages = handler.drainMessagesOrTimeOut(1); + assertEquals(messages.get(0),new String(nsqMessages.get(0).getData())); + } + + +} diff --git a/src/test/java/com/sproutsocial/nsq/RoundTripIT.java b/src/test/java/com/sproutsocial/nsq/RoundTripIT.java deleted file mode 100644 index 3623dc4..0000000 --- a/src/test/java/com/sproutsocial/nsq/RoundTripIT.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.sproutsocial.nsq; - -import org.junit.Test; - -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; - - -public class RoundTripIT extends SubscribeBase { - - public static final String PUBTEST = "pubtest"; - public static final int MAX_MSG_LEN = 20; - public static final int MESSAGES_TO_SEND = 50; - - - @Test - public void testPub() { - System.out.println("testPub. random seed:" + seed); - testPub("localhost,test_nsqd", null); - } - - @Test - public void testSinglePub() { - System.out.println("testSinglePub. random seed:" + seed); - testPub("localhost", null); - } - - @Test - public void testFailoverPub() { - System.out.println("testFailoverPub. random seed:" + seed); - testPub("localhost", "test_nsqd"); - } - - private void testPub(String nsqd, String failoverNsqd) { - - Subscriber subscriber = new Subscriber(10, "127.0.0.1"); - subscriber.subscribe(PUBTEST, "tail" + UUID.randomUUID() + "#ephemeral", handler); - - Util.sleepQuietly(1000); - - Publisher publisher = new Publisher(nsqd, failoverNsqd); - publisher.setConfig(new Config()); - List msgs = messages(MESSAGES_TO_SEND, RoundTripIT.MAX_MSG_LEN); - send(publisher, PUBTEST, msgs, 0.1f, 100); - - Util.sleepQuietly(1000); - - received.sort(String::compareTo); - assertEquals(msgs, received); - } - - private static void send(Publisher publisher, String topic, List msgs, float delayChance, int maxDelay) { - int count = 0; - for (String msg : msgs) { - if (random.nextFloat() < delayChance) { - Util.sleepQuietly(random.nextInt(maxDelay)); - } - publisher.publish(topic, msg.getBytes()); - if (++count % 10 == 0) { - System.out.println("sent " + count + " msgs"); - } - } - } -} From 979a46c7cda2f34b002b4b4853bad1dfa512da58 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Mon, 19 Sep 2022 13:33:23 -0400 Subject: [PATCH 05/11] Version bumps and adding developers to pom --- pom.xml | 12 ++++++ .../java/com/sproutsocial/nsq/Subscriber.java | 1 + .../nsq/RoundRobinDockerTestIT.java | 40 ++++++++++++++----- .../sproutsocial/nsq/TestMessageHandler.java | 3 +- 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index d2592d8..4d551e2 100644 --- a/pom.xml +++ b/pom.xml @@ -209,6 +209,18 @@ + + Blake Smith + blake@sproutsocial.com + Sprout Social + https://sproutsocial.com + + + Erik Helleren + erik.helleren@sproutsocial.com + Sprout Social + https://sproutsocial.com + Rob Seed rseed@sproutsocial.com diff --git a/src/main/java/com/sproutsocial/nsq/Subscriber.java b/src/main/java/com/sproutsocial/nsq/Subscriber.java index 8b3136c..662acf0 100644 --- a/src/main/java/com/sproutsocial/nsq/Subscriber.java +++ b/src/main/java/com/sproutsocial/nsq/Subscriber.java @@ -162,6 +162,7 @@ protected Set lookupTopic(String topic) { Util.closeQuietly(in); } } + logger.debug("Identified these NQSD for topic {}: {}",topic,nsqds); return nsqds; } diff --git a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java index 9902950..4ab8a19 100644 --- a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java @@ -19,7 +19,8 @@ public void setup() { super.setup(); Util.sleepQuietly(500); handler = new TestMessageHandler(); - subscriber = new Subscriber(client, 1, 5, cluster.getLookupNode().getHttpHostAndPort().toString()); + subscriber = new Subscriber(client, 1, 50, cluster.getLookupNode().getHttpHostAndPort().toString()); + subscriber.setDefaultMaxInFlight(1); subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); publisher = roundRobinPublisher(); } @@ -39,10 +40,6 @@ public void test_happyPath() { publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); } - private void validateMessagesSentRoundRobin(List nsqdNodes, int count, List messages, List receivedMessages) { - validateMessagesSentRoundRobin(nsqdNodes, count, messages, receivedMessages, 0); - } - private void validateMessagesSentRoundRobin(List nsqdNodes, int count, List messages, List receivedMessages, int nodeOffset) { Map> messagesByNsqd = mapByNsqd(receivedMessages); for (int i = 0; i < count; i++) { @@ -94,7 +91,7 @@ public void test_allNodesDown_LaterRecovers(){ for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { cluster.disconnectNetworkFor(nsqdNode); } - int count = 1; + int count = 50; List messages = messages(count, 40); Assert.assertThrows(NSQException.class,()-> send(topic, messages, 0.5f, 10, publisher)); @@ -102,13 +99,36 @@ public void test_allNodesDown_LaterRecovers(){ cluster.reconnectNetworkFor(nsqdNode); } - Util.sleepQuietly(5000); + Util.sleepQuietly(6000); Assert.assertTrue(handler.drainMessages(1).isEmpty()); - send(topic, messages, 0.5f, 10, publisher); - List nsqMessages = handler.drainMessagesOrTimeOut(1); - assertEquals(messages.get(0),new String(nsqMessages.get(0).getData())); + send(topic, messages, 0.5f, 100, publisher); + Util.sleepQuietly(1000); + List nsqMessages = handler.drainMessages(count); + validateReceivedAllMessages(messages,nsqMessages,false); + } + + + + @Test() + public void test_twoNodesDown_LaterRecovers(){ + + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + + cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(0)); + cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(1)); + + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes().subList(2,3), 0); + + cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(0)); + cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(1)); + + Util.sleepQuietly(6000); + + Assert.assertTrue(handler.drainMessages(1).isEmpty()); + + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); } diff --git a/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java b/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java index 301c7d2..a9d1001 100644 --- a/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java +++ b/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java @@ -4,7 +4,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -13,7 +12,7 @@ public class TestMessageHandler implements MessageHandler { public final int timeoutMillis; BlockingQueue receivedMessages = new LinkedBlockingQueue<>(); - public TestMessageHandler() { + public TestMessageHandler() { this(DEFAULT_TIMEOUT_MILLIS); } From 99901503d4555d2589c99895daaf20ff52542af7 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Wed, 21 Sep 2022 09:22:11 -0400 Subject: [PATCH 06/11] Editing tests to take into account the long time it takes for a consumer to reconnet to a failed node. --- .../sproutsocial/nsq/BaseDockerTestIT.java | 9 +++- .../nsq/RoundRobinDockerTestIT.java | 42 +++++++++++-------- .../nsq/SubscriberFocusedDockerTestIT.java | 1 - .../sproutsocial/nsq/TestMessageHandler.java | 4 +- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java index 9f4b652..d1894bd 100644 --- a/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Random; @@ -62,7 +63,7 @@ protected void send(String topic, List msgs, double delayChance, int max } publisher.publish(topic, msg.getBytes()); if (++count % 10 == 0) { - System.out.println("sent " + count + " msgs"); + LOGGER.info("sent {} msgs",count); } } } @@ -105,7 +106,11 @@ protected void validateFromParticularNsqd(List receivedMessages, int } protected Map> mapByNsqd(List messages) { - return messages.stream().collect(Collectors.groupingBy(e -> e.getConnection().getHost().toString())); + Map> byNsqd = messages.stream().collect(Collectors.groupingBy(e -> e.getConnection().getHost().toString())); + for (List m : byNsqd.values()) { + m.sort(Comparator.comparing(a -> new String(a.getData()))); + } + return byNsqd; } protected Publisher primaryOnlyPublisher() { diff --git a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java index 4ab8a19..21df0e9 100644 --- a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java @@ -2,14 +2,16 @@ import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.slf4j.LoggerFactory.getLogger; public class RoundRobinDockerTestIT extends BaseDockerTestIT { - + private static final Logger LOGGER = getLogger(RoundRobinDockerTestIT.class); private Subscriber subscriber; private Publisher publisher; private TestMessageHandler handler; @@ -18,9 +20,9 @@ public class RoundRobinDockerTestIT extends BaseDockerTestIT { public void setup() { super.setup(); Util.sleepQuietly(500); - handler = new TestMessageHandler(); + //This needs to be crazy long because it can take up to 1 min for nsq-lookup to remove the bad nodes and add them back again so we can reconnect + handler = new TestMessageHandler(60_000); subscriber = new Subscriber(client, 1, 50, cluster.getLookupNode().getHttpHostAndPort().toString()); - subscriber.setDefaultMaxInFlight(1); subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); publisher = roundRobinPublisher(); } @@ -43,7 +45,7 @@ public void test_happyPath() { private void validateMessagesSentRoundRobin(List nsqdNodes, int count, List messages, List receivedMessages, int nodeOffset) { Map> messagesByNsqd = mapByNsqd(receivedMessages); for (int i = 0; i < count; i++) { - String nsqdHst = nsqdNodes.get((i+ nodeOffset) % nsqdNodes.size()).getTcpHostAndPort().toString(); + String nsqdHst = nsqdNodes.get((i + nodeOffset) % nsqdNodes.size()).getTcpHostAndPort().toString(); int expectedIndex = i / nsqdNodes.size(); String nsqdMessage = new String(messagesByNsqd.get(nsqdHst).get(expectedIndex).getData()); assertEquals("For message " + i + " expect it to be from nsqd " + nsqdHst, messages.get(i), nsqdMessage); @@ -58,7 +60,7 @@ public void test_singleNodeFailed() { } @Test - public void test_SingleNodeFailsAndRecovers(){ + public void test_SingleNodeFailsAndRecovers() { List nsqdNodes = cluster.getNsqdNodes(); cluster.disconnectNetworkFor(nsqdNodes.get(2)); publishAndValidateRoundRobinForNodes(nsqdNodes.subList(0, 2), 0); @@ -77,23 +79,27 @@ private void publishAndValidateRoundRobinForNodes(List messages = messages(count, 40); - Assert.assertThrows(NSQException.class,()-> send(topic, messages, 0.5f, 10, publisher)); + Assert.assertThrows(NSQException.class, () -> send(topic, messages, 0.5f, 10, publisher)); } + @Test() - public void test_allNodesDown_LaterRecovers(){ + public void test_allNodesDown_LaterRecovers() { + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { cluster.disconnectNetworkFor(nsqdNode); } int count = 50; List messages = messages(count, 40); - Assert.assertThrows(NSQException.class,()-> send(topic, messages, 0.5f, 10, publisher)); + Assert.assertThrows(NSQException.class, () -> send(topic, messages, 0.5f, 10, publisher)); + LOGGER.info(subscriber.toString()); for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { cluster.reconnectNetworkFor(nsqdNode); @@ -101,25 +107,27 @@ public void test_allNodesDown_LaterRecovers(){ Util.sleepQuietly(6000); + // Explicitly recreate the subscrber to get fresh connections. Otherwise we would need to wait for the socket timeout of 60 seconds + subscriber.stop(); + subscriber = new Subscriber(client, 1, 50, cluster.getLookupNode().getHttpHostAndPort().toString()); + subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); + Assert.assertTrue(handler.drainMessages(1).isEmpty()); - send(topic, messages, 0.5f, 100, publisher); - Util.sleepQuietly(1000); - List nsqMessages = handler.drainMessages(count); - validateReceivedAllMessages(messages,nsqMessages,false); + send(topic, messages, 0.1f, 50, publisher); + List nsqMessages = handler.drainMessagesOrTimeOut(count); + validateReceivedAllMessages(messages, nsqMessages, false); } - @Test() - public void test_twoNodesDown_LaterRecovers(){ - + public void test_twoNodesDown_LaterRecovers() { publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(0)); cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(1)); - publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes().subList(2,3), 0); + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes().subList(2, 3), 0); cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(0)); cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(1)); diff --git a/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java index c827450..69aaef8 100644 --- a/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; public class SubscriberFocusedDockerTestIT extends BaseDockerTestIT { private Publisher publisher; diff --git a/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java b/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java index a9d1001..87af684 100644 --- a/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java +++ b/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java @@ -12,7 +12,7 @@ public class TestMessageHandler implements MessageHandler { public final int timeoutMillis; BlockingQueue receivedMessages = new LinkedBlockingQueue<>(); - public TestMessageHandler() { + public TestMessageHandler() { this(DEFAULT_TIMEOUT_MILLIS); } @@ -31,7 +31,7 @@ public List drainMessagesOrTimeOut(int size) { return drainMessagesOrTimeOut(size, timeoutMillis); } - public List drainMessagesOrTimeOut(int size, int timeoutMillis) { + public List drainMessagesOrTimeOut(int size, long timeoutMillis) { long timeoutTime = System.currentTimeMillis() + timeoutMillis; while (receivedMessages.size() < size && System.currentTimeMillis() < timeoutTime) { Util.sleepQuietly(50); From b9d011911171180b9a81aac59e592e7cfeedec6e Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Wed, 21 Sep 2022 09:52:10 -0400 Subject: [PATCH 07/11] Self review --- .../com/sproutsocial/nsq/BalanceStrategy.java | 5 +++- .../nsq/ListBasedBalanceStrategy.java | 27 +++++++++++++++++-- .../java/com/sproutsocial/nsq/Publisher.java | 2 +- .../nsq/SingleNsqdBallenceStrategy.java | 13 ++++++--- .../java/com/sproutsocial/nsq/Subscriber.java | 1 - 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java index 8688625..e50210c 100644 --- a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java @@ -2,7 +2,10 @@ public interface BalanceStrategy { - ConnectionDetails getConnectionDetails(); + /** + * @throws NSQException When there are no more available connections. Should be escalated to the user of the library + */ + ConnectionDetails getConnectionDetails() throws NSQException; void connectionClosed(PubConnection closedCon); diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java index 54c8273..4233661 100644 --- a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java @@ -18,11 +18,34 @@ public class ListBasedBalanceStrategy extends BasePubSub implements BalanceStrat private Function, ConnectionDetails> connectionDetailsSelector; private int failoverDurationSecs = 300; + /** + * Create a list based failover strategy that will alternate between all connected nsqd. Will reconnect to a + * disconnected or failed nsqd on the next publish that could be routed to that nsqd after the failoverDuration has + * expired (Default 5 min). + *

+ * This will throw an NSQD exception if all nsqd are in a failed state. + * + * @param nsqd a list of strings that represent HostAndPort objects. + */ public static BiFunction getRoundRobinStrategyBuilder(List nsqd) { return (c, p) -> buildRoundRobinStrategy(c, p, nsqd); } - public static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List nsqd) { + /** + * Create a list based failover strategy that shows strong preference to the first nsqd on the list. + *

+ * On publish, find the first nsqd in this list that is in a connected or connectable state. A nsqd is connectable + * if it has previously failed more than the configured failoverDuration (Default 5 min). + *

+ * This will throw an NSQD exception if all nsqd are in a failed state. + * + * @param nsqd a list of strings that represent HostAndPort objects. + */ + public static BiFunction getFailoverStrategyBuilder(List nsqd) { + return (c, p) -> buildFailoverStrategy(c,p,nsqd); + } + + private static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List nsqd) { return new ListBasedBalanceStrategy(client, parent, nsqd, new Function, ConnectionDetails>() { private volatile int nextDaemonIndex = 0; @@ -44,7 +67,7 @@ public ConnectionDetails apply(List daemonList) { }); } - public static ListBasedBalanceStrategy buildFailoverStrategy(Client client, Publisher parent, List nsqd) { + private static ListBasedBalanceStrategy buildFailoverStrategy(Client client, Publisher parent, List nsqd) { return new ListBasedBalanceStrategy(client, parent, nsqd, daemonList -> { for (int attempts = 0; attempts < daemonList.size(); attempts++) { ConnectionDetails candidate = daemonList.get(attempts); diff --git a/src/main/java/com/sproutsocial/nsq/Publisher.java b/src/main/java/com/sproutsocial/nsq/Publisher.java index 947a0c6..8a3e9ee 100644 --- a/src/main/java/com/sproutsocial/nsq/Publisher.java +++ b/src/main/java/com/sproutsocial/nsq/Publisher.java @@ -37,7 +37,7 @@ private static BiFunction getBalanceStrategy if (failoverNsqd == null) { return (c, p) -> new SingleNsqdBallenceStrategy(c, p, nsqd); } else { - return (c, p) -> ListBasedBalanceStrategy.buildFailoverStrategy(c, p, Arrays.asList(nsqd, failoverNsqd)); + return ListBasedBalanceStrategy.getFailoverStrategyBuilder(Arrays.asList(nsqd, failoverNsqd)); } } diff --git a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java index 1f86c58..305a73e 100644 --- a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java @@ -2,19 +2,24 @@ import org.slf4j.Logger; +import java.util.concurrent.TimeUnit; + import static org.slf4j.LoggerFactory.getLogger; +/** + * A single NSQD ballence strategy will attempt to publish to the single known NSQD. If that fails on a first attempt + * it will *block* for the failover duration (default 10 seconds) and attempt to reconnect. + */ public class SingleNsqdBallenceStrategy extends BasePubSub implements BalanceStrategy { private static final Logger logger = getLogger(SingleNsqdBallenceStrategy.class); protected final ConnectionDetails connectionDetails; - private final Publisher parent; private int failoverDurationSecs = 10; public SingleNsqdBallenceStrategy(Client client, Publisher parent, String nsqd) { super(client); - this.parent = parent; + logger.warn("You are configured to use a singe NSQD balance strategy. Please Read the manual and make sure you really want that."); connectionDetails = new ConnectionDetails(nsqd, - this.parent, + parent, this.failoverDurationSecs, this); } @@ -23,7 +28,7 @@ public SingleNsqdBallenceStrategy(Client client, Publisher parent, String nsqd) public ConnectionDetails getConnectionDetails() { if (!connectionDetails.makeReady()) { logger.warn("We aren't able to connect just now, so we are going to sleep for {} seconds", failoverDurationSecs); - Util.sleepQuietly(failoverDurationSecs * 1000); + Util.sleepQuietly(TimeUnit.SECONDS.toMillis(failoverDurationSecs)); if (connectionDetails.makeReady()) return connectionDetails; else { diff --git a/src/main/java/com/sproutsocial/nsq/Subscriber.java b/src/main/java/com/sproutsocial/nsq/Subscriber.java index 662acf0..8b3136c 100644 --- a/src/main/java/com/sproutsocial/nsq/Subscriber.java +++ b/src/main/java/com/sproutsocial/nsq/Subscriber.java @@ -162,7 +162,6 @@ protected Set lookupTopic(String topic) { Util.closeQuietly(in); } } - logger.debug("Identified these NQSD for topic {}: {}",topic,nsqds); return nsqds; } From 06abcf6401f2409d5af4c777ff8763d518ba6253 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Wed, 21 Sep 2022 10:06:56 -0400 Subject: [PATCH 08/11] Update readme --- README.md | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f35431a..9147fcb 100644 --- a/README.md +++ b/README.md @@ -36,28 +36,26 @@ You can batch messages manually and publish them all at once with `publish(String topic, List messages)` ### Single NSQ-d host publishing -When we have a single NSQ-d host specified (failoverNsqd is null or not specified, and there are no commas in the nsdq string) +When we have a single NSQ-d host specified (failoverNsqd is null or not specified when constructing a publisher) we will reattempt a failed publish after 10 seconds by default. This happens in line with synchronous publishes or when publishing buffered and the batch size is reached. -If this second attempt fails, the publish call will throw an exception. +If this second attempt fails, the call to publish will throw an NSQException. + ### Failover publishing -nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter, and there are no commas, this mode is activated. +nsq-j supports fail over publishing. If you specify a non-null failoverNsqd parameter or manually construct a fail over balance strategy with `ListBasedBalanceStrategy#getFailoverStrategyBuilder` -In failover mode, nsq-j prefers publishing to a primary. If that fails, nsq-j will attempt to establish a connection to -the failoverNsqd host and publish there. It will attempt to fail back to the primary after the failure timeout, -defaulting to 30 seconds. +In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will atempt to fail back to the first connection. Failover duration defaults to 5 min. -If publishing fails to write to the failoverNsqd, the write will throw an exception. +If all nsqd are in a failed state (have all failed within the failover duration), the write will throw an NSQException. ### Round-robin publishing -If you specify a comma separated list as the nsqd parameter, round-robin publishing is used. If a failoverNsqd is provided, -it will also be treated as a comma seperated list and included in the rotation. +To use round robin, construct a balen strategy with `ListBasedBalanceStrategy#getRoundRobinStrategyBuilder` providing a list of nsqd to use. -All the hosts that are included in both nsqd and failoverNsqd will be added to a rotation. Each publish action is sent -to the next host in the rotation. If a publishing fails, the host is marked "dead" for 30 seconds (default) before -it will be added back to the rotation. If all hosts are marked dead, an exception will be thrown out of publish. +All the hosts that are included in i the list will be added to a rotation. Each publish action is sent +to the next host in the rotation. If a publishing fails, the host is marked "dead" for the failover duration (5 min default) before +it will be added back to the rotation. If all hosts are marked dead, an NSQException will be thrown out of publish. ## Subscribe ```java From 5a91a1c1a7d649c22e92091d0519ef75d925263c Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Wed, 21 Sep 2022 10:54:36 -0400 Subject: [PATCH 09/11] Review feedback --- README.md | 8 ++++---- .../sproutsocial/nsq/ConnectionDetails.java | 18 ++++++++---------- .../nsq/ListBasedBalanceStrategy.java | 6 +++--- .../java/com/sproutsocial/nsq/Publisher.java | 3 +-- ...egy.java => SingleNsqdBalanceStrategy.java} | 14 +++++++++----- .../nsq/RoundRobinDockerTestIT.java | 2 +- 6 files changed, 26 insertions(+), 25 deletions(-) rename src/main/java/com/sproutsocial/nsq/{SingleNsqdBallenceStrategy.java => SingleNsqdBalanceStrategy.java} (71%) diff --git a/README.md b/README.md index 9147fcb..e28c9a0 100644 --- a/README.md +++ b/README.md @@ -43,17 +43,17 @@ publishing buffered and the batch size is reached. If this second attempt fails, the call to publish will throw an NSQException. ### Failover publishing -nsq-j supports fail over publishing. If you specify a non-null failoverNsqd parameter or manually construct a fail over balance strategy with `ListBasedBalanceStrategy#getFailoverStrategyBuilder` +nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter or manually construct a failover balance strategy with `ListBasedBalanceStrategy#getFailoverStrategyBuilder` In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will atempt to fail back to the first connection. Failover duration defaults to 5 min. -If all nsqd are in a failed state (have all failed within the failover duration), the write will throw an NSQException. +If all nsqd are in a failed state (have all failed within the failover duration), the publish will throw an NSQException. ### Round-robin publishing -To use round robin, construct a balen strategy with `ListBasedBalanceStrategy#getRoundRobinStrategyBuilder` providing a list of nsqd to use. +To use round robin, construct a balance strategy with `ListBasedBalanceStrategy#getRoundRobinStrategyBuilder` providing a list of nsqd to use. -All the hosts that are included in i the list will be added to a rotation. Each publish action is sent +All the hosts that are included in the list will be added to a rotation. Each publish action is sent to the next host in the rotation. If a publishing fails, the host is marked "dead" for the failover duration (5 min default) before it will be added back to the rotation. If all hosts are marked dead, an NSQException will be thrown out of publish. diff --git a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java index 3286c57..fbab4df 100644 --- a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java +++ b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java @@ -87,7 +87,7 @@ public synchronized void markFailure() { setCon(null); currentState = State.FAILED; failoverStart = Util.clock(); - LOGGER.warn("Failed to connect to {}, will retry later", hostAndPort); + LOGGER.warn("Failed to connect to {}, will retry after {} seconds", hostAndPort, this.failoverDurationSecs); } public void setFailoverDurationSecs(int failoverDurationSecs) { @@ -110,14 +110,12 @@ public void clearConnection() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("ConnectionDetails{"); - sb.append("parent=").append(parent); - sb.append(", hostAndPort=").append(hostAndPort); - sb.append(", con=").append(con); - sb.append(", failoverStart=").append(failoverStart); - sb.append(", failoverDurationSecs=").append(failoverDurationSecs); - sb.append(", currentState=").append(currentState); - sb.append('}'); - return sb.toString(); + return "ConnectionDetails{" + "parent=" + parent + + ", hostAndPort=" + hostAndPort + + ", con=" + con + + ", failoverStart=" + failoverStart + + ", failoverDurationSecs=" + failoverDurationSecs + + ", currentState=" + currentState + + '}'; } } diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java index 4233661..614629b 100644 --- a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java @@ -14,8 +14,8 @@ public class ListBasedBalanceStrategy extends BasePubSub implements BalanceStrategy { private static final Logger logger = getLogger(ListBasedBalanceStrategy.class); protected final List daemonList; - private Publisher parent; - private Function, ConnectionDetails> connectionDetailsSelector; + private final Publisher parent; + private final Function, ConnectionDetails> connectionDetailsSelector; private int failoverDurationSecs = 300; /** @@ -47,7 +47,7 @@ public static BiFunction getFailoverStrategy private static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List nsqd) { return new ListBasedBalanceStrategy(client, parent, nsqd, new Function, ConnectionDetails>() { - private volatile int nextDaemonIndex = 0; + private int nextDaemonIndex = 0; @Override public ConnectionDetails apply(List daemonList) { diff --git a/src/main/java/com/sproutsocial/nsq/Publisher.java b/src/main/java/com/sproutsocial/nsq/Publisher.java index 8a3e9ee..c748a67 100644 --- a/src/main/java/com/sproutsocial/nsq/Publisher.java +++ b/src/main/java/com/sproutsocial/nsq/Publisher.java @@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,7 +34,7 @@ private static BiFunction getBalanceStrategy Objects.requireNonNull(nsqd); if (failoverNsqd == null) { - return (c, p) -> new SingleNsqdBallenceStrategy(c, p, nsqd); + return (c, p) -> new SingleNsqdBalanceStrategy(c, p, nsqd); } else { return ListBasedBalanceStrategy.getFailoverStrategyBuilder(Arrays.asList(nsqd, failoverNsqd)); } diff --git a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java b/src/main/java/com/sproutsocial/nsq/SingleNsqdBalanceStrategy.java similarity index 71% rename from src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java rename to src/main/java/com/sproutsocial/nsq/SingleNsqdBalanceStrategy.java index 305a73e..125481a 100644 --- a/src/main/java/com/sproutsocial/nsq/SingleNsqdBallenceStrategy.java +++ b/src/main/java/com/sproutsocial/nsq/SingleNsqdBalanceStrategy.java @@ -7,17 +7,21 @@ import static org.slf4j.LoggerFactory.getLogger; /** - * A single NSQD ballence strategy will attempt to publish to the single known NSQD. If that fails on a first attempt + * A single NSQD balance strategy will attempt to publish to the single known NSQD. If that fails on a first attempt * it will *block* for the failover duration (default 10 seconds) and attempt to reconnect. */ -public class SingleNsqdBallenceStrategy extends BasePubSub implements BalanceStrategy { - private static final Logger logger = getLogger(SingleNsqdBallenceStrategy.class); +public class SingleNsqdBalanceStrategy extends BasePubSub implements BalanceStrategy { + private static final Logger logger = getLogger(SingleNsqdBalanceStrategy.class); protected final ConnectionDetails connectionDetails; private int failoverDurationSecs = 10; - public SingleNsqdBallenceStrategy(Client client, Publisher parent, String nsqd) { + public SingleNsqdBalanceStrategy(Client client, Publisher parent, String nsqd) { super(client); - logger.warn("You are configured to use a singe NSQD balance strategy. Please Read the manual and make sure you really want that."); + logger.warn("You are configured to use a singe NSQD balance strategy. This has both availability and correctness issues. " + + "Nsq-j is sleeping for failover duration on a failed publish inside a critical lock section impacting all threads calling to publish. " + + "This may appear to be lock starvation. " + + "The client is also not resilient to failures in this mode, a single outage can result in dataloss and crashing (slowly). "+ + "Please use failover or round robin balance strategy to avoid these issues"); connectionDetails = new ConnectionDetails(nsqd, parent, this.failoverDurationSecs, diff --git a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java index 21df0e9..fc3cd2e 100644 --- a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java @@ -20,7 +20,7 @@ public class RoundRobinDockerTestIT extends BaseDockerTestIT { public void setup() { super.setup(); Util.sleepQuietly(500); - //This needs to be crazy long because it can take up to 1 min for nsq-lookup to remove the bad nodes and add them back again so we can reconnect + //This needs to be crazy long because it can take up to 1 min for the connections in the subscriber to timeout and reconnect. handler = new TestMessageHandler(60_000); subscriber = new Subscriber(client, 1, 50, cluster.getLookupNode().getHttpHostAndPort().toString()); subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); From 0b0625542c6e52cc5afaeac67b5b44fc5ede90cb Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Wed, 21 Sep 2022 10:59:41 -0400 Subject: [PATCH 10/11] Adding more developers --- pom.xml | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 4d551e2..f2739a1 100644 --- a/pom.xml +++ b/pom.xml @@ -222,8 +222,20 @@ https://sproutsocial.com - Rob Seed - rseed@sproutsocial.com + Dan Johnson + dan.johnson@sproutsocial.com + Sprout Social + https://sproutsocial.com + + + Dave Huber + dave.huber@sproutsocial.com + Sprout Social + https://sproutsocial.com + + + Jack Sadanowicz + jack@sproutsocial.com Sprout Social https://sproutsocial.com From 45dfa99e9fed89e62af3c7bd6986ede621db00a0 Mon Sep 17 00:00:00 2001 From: Erik Helleren Date: Thu, 22 Sep 2022 15:12:26 -0400 Subject: [PATCH 11/11] Review Feedback --- README.md | 2 +- .../sproutsocial/nsq/ConnectionDetails.java | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index e28c9a0..7c42fc4 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ If this second attempt fails, the call to publish will throw an NSQException. ### Failover publishing nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter or manually construct a failover balance strategy with `ListBasedBalanceStrategy#getFailoverStrategyBuilder` -In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will atempt to fail back to the first connection. Failover duration defaults to 5 min. +In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will attempt to reconnect to the first nsqd. Failover duration defaults to 5 min. If all nsqd are in a failed state (have all failed within the failover duration), the publish will throw an NSQException. diff --git a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java index fbab4df..99c7451 100644 --- a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java +++ b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java @@ -54,12 +54,12 @@ private boolean canAttemptRecovery() { } private boolean connectAttempt() { - if (getCon() != null) { - getCon().close(); + if (con != null) { + con.close(); } - setCon(new PubConnection(basePubSub.getClient(), this.hostAndPort, parent)); + con = new PubConnection(basePubSub.getClient(), this.hostAndPort, parent); try { - getCon().connect(basePubSub.getConfig()); + con.connect(basePubSub.getConfig()); currentState = State.CONNECTED; } catch (IOException e) { markFailure(); @@ -83,11 +83,11 @@ public int hashCode() { } public synchronized void markFailure() { - Util.closeQuietly(getCon()); - setCon(null); + Util.closeQuietly(con); + con = null; currentState = State.FAILED; failoverStart = Util.clock(); - LOGGER.warn("Failed to connect to {}, will retry after {} seconds", hostAndPort, this.failoverDurationSecs); + LOGGER.warn("Marking the connection to host {} as failed , will retry after {} seconds", hostAndPort, this.failoverDurationSecs); } public void setFailoverDurationSecs(int failoverDurationSecs) { @@ -99,10 +99,6 @@ public PubConnection getCon() { return con; } - private void setCon(PubConnection con) { - this.con = con; - } - public void clearConnection() { this.con = null; currentState = State.NOT_CONNECTED;