diff --git a/README.md b/README.md index 0a76246..7c42fc4 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,28 @@ after nsqd responds `OK` You can batch messages manually and publish them all at once with `publish(String topic, List messages)` +### Single NSQ-d host publishing +When we have a single NSQ-d host specified (failoverNsqd is null or not specified when constructing a publisher) +we will reattempt a failed publish after 10 seconds by default. This happens in line with synchronous publishes or when +publishing buffered and the batch size is reached. + +If this second attempt fails, the call to publish will throw an NSQException. + +### Failover publishing +nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter or manually construct a failover balance strategy with `ListBasedBalanceStrategy#getFailoverStrategyBuilder` + +In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will attempt to reconnect to the first nsqd. Failover duration defaults to 5 min. + +If all nsqd are in a failed state (have all failed within the failover duration), the publish will throw an NSQException. + + +### Round-robin publishing +To use round robin, construct a balance strategy with `ListBasedBalanceStrategy#getRoundRobinStrategyBuilder` providing a list of nsqd to use. + +All the hosts that are included in the list will be added to a rotation. Each publish action is sent +to the next host in the rotation. If a publishing fails, the host is marked "dead" for the failover duration (5 min default) before +it will be added back to the rotation. If all hosts are marked dead, an NSQException will be thrown out of publish. + ## Subscribe ```java public class PubExample { diff --git a/docker-compose.yml b/docker-compose.yml index fc16a3f..a4f4227 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,13 +2,13 @@ version: "3" services: test_nsqlookupd: - image: nsqio/nsq:v0.3.8 + image: nsqio/nsq:v1.2.1 command: /nsqlookupd ports: - "127.0.0.1:4160:4160" - "127.0.0.1:4161:4161" test_nsqd: - image: nsqio/nsq:v0.3.8 + image: nsqio/nsq:v1.2.1 links: - test_nsqlookupd:test_nsqlookupd command: /nsqd --lookupd-tcp-address=test_nsqlookupd:4160 --broadcast-address=test_nsqd --tcp-address=test_nsqd:4150 @@ -16,7 +16,7 @@ services: - "127.0.0.1:4150:4150" - "127.0.0.1:4151:4151" test_nsqadmin: - image: nsqio/nsq:v0.3.8 + image: nsqio/nsq:v1.2.1 links: - test_nsqlookupd:test_nsqlookupd - test_nsqd:test_nsqd diff --git a/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java new file mode 100644 index 0000000..e50210c --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/BalanceStrategy.java @@ -0,0 +1,15 @@ +package com.sproutsocial.nsq; + +public interface BalanceStrategy { + + /** + * @throws NSQException When there are no more available connections. Should be escalated to the user of the library + */ + ConnectionDetails getConnectionDetails() throws NSQException; + + void connectionClosed(PubConnection closedCon); + + int getFailoverDurationSecs(); + + void setFailoverDurationSecs(int failoverDurationSecs); +} diff --git a/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java new file mode 100644 index 0000000..99c7451 --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/ConnectionDetails.java @@ -0,0 +1,117 @@ +package com.sproutsocial.nsq; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.slf4j.LoggerFactory.getLogger; +import static com.sproutsocial.nsq.Util.*; + +class ConnectionDetails { + private enum State { + CONNECTED, + NOT_CONNECTED, + FAILED + } + + private static final Logger LOGGER = getLogger(ConnectionDetails.class); + private final Publisher parent; + private final BasePubSub basePubSub; + HostAndPort hostAndPort; + private PubConnection con = null; + long failoverStart = 0; + private volatile int failoverDurationSecs; + private State currentState = State.NOT_CONNECTED; + + public ConnectionDetails(String hostAndPort, Publisher parent, int failoverDurationSecs, BasePubSub basePubSub) { + checkNotNull(hostAndPort); + checkNotNull(parent); + checkNotNull(basePubSub); + this.hostAndPort = HostAndPort.fromString(hostAndPort).withDefaultPort(4150); + this.parent = parent; + this.failoverDurationSecs = failoverDurationSecs; + this.basePubSub = basePubSub; + } + + /** + * @return true if this host is ready to receive data + */ + protected synchronized boolean makeReady() { + if (currentState == State.NOT_CONNECTED) { + if (parent.isStopping) { + throw new NSQException("publisher stopped"); + } + return connectAttempt(); + } else if (currentState == State.FAILED && canAttemptRecovery()) { + return connectAttempt(); + } + return currentState == State.CONNECTED; + } + + private boolean canAttemptRecovery() { + return Util.clock() - failoverStart > TimeUnit.SECONDS.toMillis(failoverDurationSecs); + } + + private boolean connectAttempt() { + if (con != null) { + con.close(); + } + con = new PubConnection(basePubSub.getClient(), this.hostAndPort, parent); + try { + con.connect(basePubSub.getConfig()); + currentState = State.CONNECTED; + } catch (IOException e) { + markFailure(); + return false; + } + LOGGER.info("publisher connected:{}", hostAndPort); + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectionDetails that = (ConnectionDetails) o; + return hostAndPort.equals(that.hostAndPort); + } + + @Override + public int hashCode() { + return hostAndPort.hashCode(); + } + + public synchronized void markFailure() { + Util.closeQuietly(con); + con = null; + currentState = State.FAILED; + failoverStart = Util.clock(); + LOGGER.warn("Marking the connection to host {} as failed , will retry after {} seconds", hostAndPort, this.failoverDurationSecs); + } + + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + } + + + public PubConnection getCon() { + return con; + } + + public void clearConnection() { + this.con = null; + currentState = State.NOT_CONNECTED; + } + + @Override + public String toString() { + return "ConnectionDetails{" + "parent=" + parent + + ", hostAndPort=" + hostAndPort + + ", con=" + con + + ", failoverStart=" + failoverStart + + ", failoverDurationSecs=" + failoverDurationSecs + + ", currentState=" + currentState + + '}'; + } +} diff --git a/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java new file mode 100644 index 0000000..614629b --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java @@ -0,0 +1,131 @@ +package com.sproutsocial.nsq; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static com.sproutsocial.nsq.Util.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; + +public class ListBasedBalanceStrategy extends BasePubSub implements BalanceStrategy { + private static final Logger logger = getLogger(ListBasedBalanceStrategy.class); + protected final List daemonList; + private final Publisher parent; + private final Function, ConnectionDetails> connectionDetailsSelector; + private int failoverDurationSecs = 300; + + /** + * Create a list based failover strategy that will alternate between all connected nsqd. Will reconnect to a + * disconnected or failed nsqd on the next publish that could be routed to that nsqd after the failoverDuration has + * expired (Default 5 min). + *

+ * This will throw an NSQD exception if all nsqd are in a failed state. + * + * @param nsqd a list of strings that represent HostAndPort objects. + */ + public static BiFunction getRoundRobinStrategyBuilder(List nsqd) { + return (c, p) -> buildRoundRobinStrategy(c, p, nsqd); + } + + /** + * Create a list based failover strategy that shows strong preference to the first nsqd on the list. + *

+ * On publish, find the first nsqd in this list that is in a connected or connectable state. A nsqd is connectable + * if it has previously failed more than the configured failoverDuration (Default 5 min). + *

+ * This will throw an NSQD exception if all nsqd are in a failed state. + * + * @param nsqd a list of strings that represent HostAndPort objects. + */ + public static BiFunction getFailoverStrategyBuilder(List nsqd) { + return (c, p) -> buildFailoverStrategy(c,p,nsqd); + } + + private static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List nsqd) { + return new ListBasedBalanceStrategy(client, parent, nsqd, new Function, ConnectionDetails>() { + private int nextDaemonIndex = 0; + + @Override + public ConnectionDetails apply(List daemonList) { + for (int attempts = 0; attempts < daemonList.size(); attempts++) { + ConnectionDetails candidate = daemonList.get(nextDaemonIndex); + boolean candidateReady = candidate.makeReady(); + nextDaemonIndex++; + if (nextDaemonIndex >= daemonList.size()) { + nextDaemonIndex = 0; + } + if (candidateReady) { + return candidate; + } + } + throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); + } + }); + } + + private static ListBasedBalanceStrategy buildFailoverStrategy(Client client, Publisher parent, List nsqd) { + return new ListBasedBalanceStrategy(client, parent, nsqd, daemonList -> { + for (int attempts = 0; attempts < daemonList.size(); attempts++) { + ConnectionDetails candidate = daemonList.get(attempts); + if (candidate.makeReady()) { + return candidate; + } + } + throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList); + }); + } + + public ListBasedBalanceStrategy(Client client, Publisher parent, List nsqd, Function, ConnectionDetails> connectionDetailsSelector) { + super(client); + checkNotNull(parent); + checkNotNull(nsqd); + checkNotNull(connectionDetailsSelector); + + this.parent = parent; + this.connectionDetailsSelector = connectionDetailsSelector; + List connectionDetails = new ArrayList<>(); + for (String host : nsqd) { + if (host != null) + connectionDetails.add(new ConnectionDetails(host, this.parent, this.failoverDurationSecs, this)); + } + daemonList = Collections.unmodifiableList(connectionDetails); + } + + @Override + public ConnectionDetails getConnectionDetails() { + return connectionDetailsSelector.apply(daemonList); + } + + @Override + public synchronized void connectionClosed(PubConnection closedCon) { + for (ConnectionDetails daemon : daemonList) { + if (daemon.getCon() == closedCon) { + daemon.clearConnection(); + logger.debug("removed closed publisher connection:{}", closedCon.getHost()); + } + } + } + + @Override + public int getFailoverDurationSecs() { + return this.failoverDurationSecs; + } + + @Override + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + for (ConnectionDetails connectionDetails : daemonList) { + connectionDetails.setFailoverDurationSecs(failoverDurationSecs); + } + } + + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + "daemonList=" + daemonList + ", failoverDurationSecs=" + failoverDurationSecs + '}'; + } +} diff --git a/src/main/java/com/sproutsocial/nsq/Publisher.java b/src/main/java/com/sproutsocial/nsq/Publisher.java index a1fac28..c748a67 100644 --- a/src/main/java/com/sproutsocial/nsq/Publisher.java +++ b/src/main/java/com/sproutsocial/nsq/Publisher.java @@ -1,103 +1,76 @@ package com.sproutsocial.nsq; -import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static com.sproutsocial.nsq.Util.checkArgument; import static com.sproutsocial.nsq.Util.checkNotNull; @ThreadSafe public class Publisher extends BasePubSub { - - private final HostAndPort nsqd; - private final HostAndPort failoverNsqd; - private PubConnection con; - private boolean isFailover = false; - private long failoverStart; - private int failoverDurationSecs = 300; - private final Map batchers = new HashMap(); - private ScheduledExecutorService batchExecutor; - private static final int DEFAULT_MAX_BATCH_SIZE = 16 * 1024; private static final int DEFUALT_MAX_BATCH_DELAY = 300; - private static final Logger logger = LoggerFactory.getLogger(Publisher.class); + private final BalanceStrategy balanceStrategy; + private final Map batchers = new HashMap<>(); + private ScheduledExecutorService batchExecutor; public Publisher(Client client, String nsqd, String failoverNsqd) { + this(client, getBalanceStrategyBiFunction(nsqd, failoverNsqd)); + } + + private static BiFunction getBalanceStrategyBiFunction(String nsqd, String failoverNsqd) { + Objects.requireNonNull(nsqd); + + if (failoverNsqd == null) { + return (c, p) -> new SingleNsqdBalanceStrategy(c, p, nsqd); + } else { + return ListBasedBalanceStrategy.getFailoverStrategyBuilder(Arrays.asList(nsqd, failoverNsqd)); + } + } + + public Publisher(Client client, BiFunction balanceStrategyFactory) { super(client); - this.nsqd = HostAndPort.fromString(nsqd).withDefaultPort(4150); - this.failoverNsqd = failoverNsqd != null ? HostAndPort.fromString(failoverNsqd).withDefaultPort(4150) : null; client.addPublisher(this); + this.balanceStrategy = balanceStrategyFactory.apply(client, this); } + public Publisher(String nsqd, String failoverNsqd) { this(Client.getDefaultClient(), nsqd, failoverNsqd); } public Publisher(String nsqd) { - this(nsqd, null); - } - - @GuardedBy("this") - private void checkConnection() throws IOException { - if (con == null) { - if (isStopping) { - throw new NSQException("publisher stopped"); - } - connect(nsqd); - } - else if (isFailover && Util.clock() - failoverStart > failoverDurationSecs * 1000) { - isFailover = false; - connect(nsqd); - logger.info("using primary nsqd"); - } + this(Client.getDefaultClient(), nsqd, null); } - @GuardedBy("this") - private void connect(HostAndPort host) throws IOException { - if (con != null) { - con.close(); - } - con = new PubConnection(client, host, this); - try { - con.connect(config); - } - catch(IOException e) { - con.close(); - con = null; - throw e; - } - logger.info("publisher connected:{}", host); - } public synchronized void connectionClosed(PubConnection closedCon) { - if (con == closedCon) { - con = null; - logger.debug("removed closed publisher connection:{}", closedCon.getHost()); - } + balanceStrategy.connectionClosed(closedCon); } public synchronized void publish(String topic, byte[] data) { checkNotNull(topic); checkNotNull(data); checkArgument(data.length > 0); + ConnectionDetails connectionDetails = balanceStrategy.getConnectionDetails(); try { - checkConnection(); - con.publish(topic, data); - } - catch (Exception e) { - logger.error("publish error with:{}", isFailover ? failoverNsqd : nsqd, e); - publishFailover(topic, data); + connectionDetails.getCon().publish(topic, data); + } catch (Exception e) { + connectionDetails.markFailure(); + logger.error("publish error with", e); + publish(topic, data); } } @@ -107,12 +80,12 @@ public synchronized void publishDeferred(String topic, byte[] data, long delay, checkArgument(data.length > 0); checkArgument(delay > 0); checkNotNull(unit); + ConnectionDetails connection = balanceStrategy.getConnectionDetails(); try { - checkConnection(); - con.publishDeferred(topic, data, unit.toMillis(delay)); - } - catch (Exception e) { - //deferred publish never fails over + connection.getCon().publishDeferred(topic, data, unit.toMillis(delay)); + } catch (Exception e) { + connection.markFailure(); + //deferred publish does not retry throw new NSQException("deferred publish failed", e); } } @@ -121,39 +94,13 @@ public synchronized void publish(String topic, List dataList) { checkNotNull(topic); checkNotNull(dataList); checkArgument(dataList.size() > 0); + ConnectionDetails connectionDetails = balanceStrategy.getConnectionDetails(); try { - checkConnection(); - con.publish(topic, dataList); - } - catch (Exception e) { - logger.error("publish error with:{}", isFailover ? failoverNsqd : nsqd, e); - for (byte[] data : dataList) { - publishFailover(topic, data); - } - } - } - - @GuardedBy("this") - private void publishFailover(String topic, byte[] data) { - try { - if (failoverNsqd == null) { - logger.warn("publish failed but no failoverNsqd configured. Will wait and retry once."); - Util.sleepQuietly(10000); //could do exponential backoff or make configurable - connect(nsqd); - } - else if (!isFailover) { - failoverStart = Util.clock(); - isFailover = true; - connect(failoverNsqd); - logger.info("using failover nsqd:{}", failoverNsqd); - } - con.publish(topic, data); - } - catch (Exception e) { - Util.closeQuietly(con); - con = null; - isFailover = false; - throw new NSQException("publish failed", e); + connectionDetails.getCon().publish(topic, dataList); + } catch (Exception e) { + logger.error("publish error", e); + connectionDetails.markFailure(); + publish(topic, dataList); } } @@ -189,8 +136,6 @@ synchronized ScheduledExecutorService getBatchExecutor() { public synchronized void stop() { flushBatchers(); super.stop(); - Util.closeQuietly(con); - con = null; if (batchExecutor != null) { Util.shutdownAndAwaitTermination(batchExecutor, 40, TimeUnit.MILLISECONDS); } @@ -206,11 +151,11 @@ protected void flushBatchers() { } public synchronized int getFailoverDurationSecs() { - return failoverDurationSecs; + return balanceStrategy.getFailoverDurationSecs(); } public synchronized void setFailoverDurationSecs(int failoverDurationSecs) { - this.failoverDurationSecs = failoverDurationSecs; + balanceStrategy.setFailoverDurationSecs(failoverDurationSecs); } } diff --git a/src/main/java/com/sproutsocial/nsq/SingleNsqdBalanceStrategy.java b/src/main/java/com/sproutsocial/nsq/SingleNsqdBalanceStrategy.java new file mode 100644 index 0000000..125481a --- /dev/null +++ b/src/main/java/com/sproutsocial/nsq/SingleNsqdBalanceStrategy.java @@ -0,0 +1,74 @@ +package com.sproutsocial.nsq; + +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * A single NSQD balance strategy will attempt to publish to the single known NSQD. If that fails on a first attempt + * it will *block* for the failover duration (default 10 seconds) and attempt to reconnect. + */ +public class SingleNsqdBalanceStrategy extends BasePubSub implements BalanceStrategy { + private static final Logger logger = getLogger(SingleNsqdBalanceStrategy.class); + protected final ConnectionDetails connectionDetails; + private int failoverDurationSecs = 10; + + public SingleNsqdBalanceStrategy(Client client, Publisher parent, String nsqd) { + super(client); + logger.warn("You are configured to use a singe NSQD balance strategy. This has both availability and correctness issues. " + + "Nsq-j is sleeping for failover duration on a failed publish inside a critical lock section impacting all threads calling to publish. " + + "This may appear to be lock starvation. " + + "The client is also not resilient to failures in this mode, a single outage can result in dataloss and crashing (slowly). "+ + "Please use failover or round robin balance strategy to avoid these issues"); + connectionDetails = new ConnectionDetails(nsqd, + parent, + this.failoverDurationSecs, + this); + } + + @Override + public ConnectionDetails getConnectionDetails() { + if (!connectionDetails.makeReady()) { + logger.warn("We aren't able to connect just now, so we are going to sleep for {} seconds", failoverDurationSecs); + Util.sleepQuietly(TimeUnit.SECONDS.toMillis(failoverDurationSecs)); + if (connectionDetails.makeReady()) + return connectionDetails; + else { + throw new NSQException("Unable to connect"); + } + } else { + return connectionDetails; + } + + } + + @Override + public synchronized void connectionClosed(PubConnection closedCon) { + if (connectionDetails.getCon() == closedCon) { + connectionDetails.clearConnection(); + logger.debug("removed closed publisher connection:{}", closedCon.getHost()); + } + } + + @Override + public int getFailoverDurationSecs() { + return failoverDurationSecs; + } + + @Override + public void setFailoverDurationSecs(int failoverDurationSecs) { + this.failoverDurationSecs = failoverDurationSecs; + this.connectionDetails.setFailoverDurationSecs(failoverDurationSecs); + } + + + @Override + public String toString() { + return "SingleNsqdBallenceStrategy{" + + "daemon=" + connectionDetails + + ", failoverDurationSecs=" + failoverDurationSecs + + '}'; + } +} diff --git a/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java index a3c34ab..d1894bd 100644 --- a/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/BaseDockerTestIT.java @@ -7,7 +7,9 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -61,7 +63,7 @@ protected void send(String topic, List msgs, double delayChance, int max } publisher.publish(topic, msg.getBytes()); if (++count % 10 == 0) { - System.out.println("sent " + count + " msgs"); + LOGGER.info("sent {} msgs",count); } } } @@ -103,6 +105,14 @@ protected void validateFromParticularNsqd(List receivedMessages, int } } + protected Map> mapByNsqd(List messages) { + Map> byNsqd = messages.stream().collect(Collectors.groupingBy(e -> e.getConnection().getHost().toString())); + for (List m : byNsqd.values()) { + m.sort(Comparator.comparing(a -> new String(a.getData()))); + } + return byNsqd; + } + protected Publisher primaryOnlyPublisher() { return new Publisher(client, cluster.getNsqdNodes().get(0).getTcpHostAndPort().toString(), null); } @@ -113,6 +123,13 @@ protected Publisher backupPublisher() { return publisher; } + protected Publisher roundRobinPublisher() { + List nsqdHosts = cluster.getNsqdNodes().stream().map(e -> e.getTcpHostAndPort().toString()).collect(Collectors.toList()); + Publisher publisher = new Publisher(client, ListBasedBalanceStrategy.getRoundRobinStrategyBuilder(nsqdHosts)); + publisher.setFailoverDurationSecs(5); + return publisher; + } + public void validateReceivedAllMessages(List expected, List actual, boolean validateOrder) { List actualMessages = actual.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); List expectedCopy = new ArrayList<>(expected); diff --git a/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java new file mode 100644 index 0000000..fc3cd2e --- /dev/null +++ b/src/test/java/com/sproutsocial/nsq/RoundRobinDockerTestIT.java @@ -0,0 +1,143 @@ +package com.sproutsocial.nsq; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.slf4j.LoggerFactory.getLogger; + +public class RoundRobinDockerTestIT extends BaseDockerTestIT { + private static final Logger LOGGER = getLogger(RoundRobinDockerTestIT.class); + private Subscriber subscriber; + private Publisher publisher; + private TestMessageHandler handler; + + @Override + public void setup() { + super.setup(); + Util.sleepQuietly(500); + //This needs to be crazy long because it can take up to 1 min for the connections in the subscriber to timeout and reconnect. + handler = new TestMessageHandler(60_000); + subscriber = new Subscriber(client, 1, 50, cluster.getLookupNode().getHttpHostAndPort().toString()); + subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); + publisher = roundRobinPublisher(); + } + + @Override + public void teardown() throws InterruptedException { + subscriber.stop(); + if (publisher != null) { + publisher.stop(); + } + super.teardown(); + } + + + @Test + public void test_happyPath() { + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + } + + private void validateMessagesSentRoundRobin(List nsqdNodes, int count, List messages, List receivedMessages, int nodeOffset) { + Map> messagesByNsqd = mapByNsqd(receivedMessages); + for (int i = 0; i < count; i++) { + String nsqdHst = nsqdNodes.get((i + nodeOffset) % nsqdNodes.size()).getTcpHostAndPort().toString(); + int expectedIndex = i / nsqdNodes.size(); + String nsqdMessage = new String(messagesByNsqd.get(nsqdHst).get(expectedIndex).getData()); + assertEquals("For message " + i + " expect it to be from nsqd " + nsqdHst, messages.get(i), nsqdMessage); + } + } + + @Test + public void test_singleNodeFailed() { + List nsqdNodes = cluster.getNsqdNodes(); + cluster.disconnectNetworkFor(nsqdNodes.get(2)); + publishAndValidateRoundRobinForNodes(nsqdNodes.subList(0, 2), 0); + } + + @Test + public void test_SingleNodeFailsAndRecovers() { + List nsqdNodes = cluster.getNsqdNodes(); + cluster.disconnectNetworkFor(nsqdNodes.get(2)); + publishAndValidateRoundRobinForNodes(nsqdNodes.subList(0, 2), 0); + cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(2)); + Util.sleepQuietly(5000); + publishAndValidateRoundRobinForNodes(nsqdNodes, 2); + } + + private void publishAndValidateRoundRobinForNodes(List nsqdNodes, int nodeOffset) { + int count = 10 * nsqdNodes.size(); + List messages = messages(count, 40); + send(topic, messages, 0.5f, 10, publisher); + List receivedMessages = handler.drainMessagesOrTimeOut(messages.size()); + validateReceivedAllMessages(messages, receivedMessages, false); + validateMessagesSentRoundRobin(nsqdNodes, count, messages, receivedMessages, nodeOffset); + } + + @Test(timeout = 500) + public void test_allNodesDown_throwsException() { + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { + cluster.disconnectNetworkFor(nsqdNode); + } + int count = 1; + List messages = messages(count, 40); + Assert.assertThrows(NSQException.class, () -> send(topic, messages, 0.5f, 10, publisher)); + } + + + @Test() + public void test_allNodesDown_LaterRecovers() { + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { + cluster.disconnectNetworkFor(nsqdNode); + } + int count = 50; + List messages = messages(count, 40); + Assert.assertThrows(NSQException.class, () -> send(topic, messages, 0.5f, 10, publisher)); + LOGGER.info(subscriber.toString()); + + for (NsqDockerCluster.NsqdNode nsqdNode : cluster.getNsqdNodes()) { + cluster.reconnectNetworkFor(nsqdNode); + } + + Util.sleepQuietly(6000); + + // Explicitly recreate the subscrber to get fresh connections. Otherwise we would need to wait for the socket timeout of 60 seconds + subscriber.stop(); + subscriber = new Subscriber(client, 1, 50, cluster.getLookupNode().getHttpHostAndPort().toString()); + subscriber.subscribe(topic, "tail" + System.currentTimeMillis(), handler); + + Assert.assertTrue(handler.drainMessages(1).isEmpty()); + + send(topic, messages, 0.1f, 50, publisher); + List nsqMessages = handler.drainMessagesOrTimeOut(count); + validateReceivedAllMessages(messages, nsqMessages, false); + } + + + @Test() + public void test_twoNodesDown_LaterRecovers() { + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + + cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(0)); + cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(1)); + + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes().subList(2, 3), 0); + + cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(0)); + cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(1)); + + Util.sleepQuietly(6000); + + Assert.assertTrue(handler.drainMessages(1).isEmpty()); + + publishAndValidateRoundRobinForNodes(cluster.getNsqdNodes(), 0); + } + + +} diff --git a/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java b/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java index c827450..69aaef8 100644 --- a/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java +++ b/src/test/java/com/sproutsocial/nsq/SubscriberFocusedDockerTestIT.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; public class SubscriberFocusedDockerTestIT extends BaseDockerTestIT { private Publisher publisher; diff --git a/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java b/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java index 301c7d2..87af684 100644 --- a/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java +++ b/src/test/java/com/sproutsocial/nsq/TestMessageHandler.java @@ -4,7 +4,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -32,7 +31,7 @@ public List drainMessagesOrTimeOut(int size) { return drainMessagesOrTimeOut(size, timeoutMillis); } - public List drainMessagesOrTimeOut(int size, int timeoutMillis) { + public List drainMessagesOrTimeOut(int size, long timeoutMillis) { long timeoutTime = System.currentTimeMillis() + timeoutMillis; while (receivedMessages.size() < size && System.currentTimeMillis() < timeoutTime) { Util.sleepQuietly(50);