Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIS-4916] Connection management strategy #40

Merged
merged 13 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,28 @@ after nsqd responds `OK`
You can batch messages manually and publish them all at once with
`publish(String topic, List<byte[]> messages)`

### Single NSQ-d host publishing
erik-helleren marked this conversation as resolved.
Show resolved Hide resolved
When we have a single NSQ-d host specified (failoverNsqd is null or not specified when constructing a publisher)
we will reattempt a failed publish after 10 seconds by default. This happens in line with synchronous publishes or when
publishing buffered and the batch size is reached.

If this second attempt fails, the call to publish will throw an NSQException.

### Failover publishing
nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter or manually construct a failover balance strategy with `ListBasedBalanceStrategy#getFailoverStrategyBuilder`

In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will attempt to reconnect to the first nsqd. Failover duration defaults to 5 min.

If all nsqd are in a failed state (have all failed within the failover duration), the publish will throw an NSQException.


### Round-robin publishing
To use round robin, construct a balance strategy with `ListBasedBalanceStrategy#getRoundRobinStrategyBuilder` providing a list of nsqd to use.

All the hosts that are included in the list will be added to a rotation. Each publish action is sent
to the next host in the rotation. If a publishing fails, the host is marked "dead" for the failover duration (5 min default) before
it will be added back to the rotation. If all hosts are marked dead, an NSQException will be thrown out of publish.

## Subscribe
```java
public class PubExample {
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ version: "3"

services:
test_nsqlookupd:
image: nsqio/nsq:v0.3.8
image: nsqio/nsq:v1.2.1
command: /nsqlookupd
ports:
- "127.0.0.1:4160:4160"
- "127.0.0.1:4161:4161"
test_nsqd:
image: nsqio/nsq:v0.3.8
image: nsqio/nsq:v1.2.1
links:
- test_nsqlookupd:test_nsqlookupd
command: /nsqd --lookupd-tcp-address=test_nsqlookupd:4160 --broadcast-address=test_nsqd --tcp-address=test_nsqd:4150
ports:
- "127.0.0.1:4150:4150"
- "127.0.0.1:4151:4151"
test_nsqadmin:
image: nsqio/nsq:v0.3.8
image: nsqio/nsq:v1.2.1
links:
- test_nsqlookupd:test_nsqlookupd
- test_nsqd:test_nsqd
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/sproutsocial/nsq/BalanceStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.sproutsocial.nsq;

public interface BalanceStrategy {

/**
* @throws NSQException When there are no more available connections. Should be escalated to the user of the library
*/
ConnectionDetails getConnectionDetails() throws NSQException;

void connectionClosed(PubConnection closedCon);

int getFailoverDurationSecs();

void setFailoverDurationSecs(int failoverDurationSecs);
}
117 changes: 117 additions & 0 deletions src/main/java/com/sproutsocial/nsq/ConnectionDetails.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.sproutsocial.nsq;

import org.slf4j.Logger;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static org.slf4j.LoggerFactory.getLogger;
import static com.sproutsocial.nsq.Util.*;

class ConnectionDetails {
private enum State {
CONNECTED,
NOT_CONNECTED,
FAILED
}

private static final Logger LOGGER = getLogger(ConnectionDetails.class);
private final Publisher parent;
private final BasePubSub basePubSub;
HostAndPort hostAndPort;
private PubConnection con = null;
long failoverStart = 0;
private volatile int failoverDurationSecs;
private State currentState = State.NOT_CONNECTED;

public ConnectionDetails(String hostAndPort, Publisher parent, int failoverDurationSecs, BasePubSub basePubSub) {
checkNotNull(hostAndPort);
checkNotNull(parent);
checkNotNull(basePubSub);
this.hostAndPort = HostAndPort.fromString(hostAndPort).withDefaultPort(4150);
this.parent = parent;
this.failoverDurationSecs = failoverDurationSecs;
this.basePubSub = basePubSub;
}

/**
* @return true if this host is ready to receive data
*/
protected synchronized boolean makeReady() {
if (currentState == State.NOT_CONNECTED) {
if (parent.isStopping) {
throw new NSQException("publisher stopped");
}
return connectAttempt();
} else if (currentState == State.FAILED && canAttemptRecovery()) {
return connectAttempt();
}
return currentState == State.CONNECTED;
}

private boolean canAttemptRecovery() {
return Util.clock() - failoverStart > TimeUnit.SECONDS.toMillis(failoverDurationSecs);
}

private boolean connectAttempt() {
if (con != null) {
con.close();
}
con = new PubConnection(basePubSub.getClient(), this.hostAndPort, parent);
try {
con.connect(basePubSub.getConfig());
currentState = State.CONNECTED;
} catch (IOException e) {
markFailure();
return false;
}
LOGGER.info("publisher connected:{}", hostAndPort);
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConnectionDetails that = (ConnectionDetails) o;
return hostAndPort.equals(that.hostAndPort);
}

@Override
public int hashCode() {
return hostAndPort.hashCode();
}

public synchronized void markFailure() {
Util.closeQuietly(con);
con = null;
currentState = State.FAILED;
failoverStart = Util.clock();
LOGGER.warn("Marking the connection to host {} as failed , will retry after {} seconds", hostAndPort, this.failoverDurationSecs);
}

public void setFailoverDurationSecs(int failoverDurationSecs) {
this.failoverDurationSecs = failoverDurationSecs;
}


public PubConnection getCon() {
return con;
}

public void clearConnection() {
this.con = null;
currentState = State.NOT_CONNECTED;
}

@Override
public String toString() {
return "ConnectionDetails{" + "parent=" + parent +
", hostAndPort=" + hostAndPort +
", con=" + con +
", failoverStart=" + failoverStart +
", failoverDurationSecs=" + failoverDurationSecs +
", currentState=" + currentState +
'}';
}
}
131 changes: 131 additions & 0 deletions src/main/java/com/sproutsocial/nsq/ListBasedBalanceStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.sproutsocial.nsq;

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import static com.sproutsocial.nsq.Util.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;

public class ListBasedBalanceStrategy extends BasePubSub implements BalanceStrategy {
private static final Logger logger = getLogger(ListBasedBalanceStrategy.class);
protected final List<ConnectionDetails> daemonList;
private final Publisher parent;
private final Function<List<ConnectionDetails>, ConnectionDetails> connectionDetailsSelector;
private int failoverDurationSecs = 300;

/**
* Create a list based failover strategy that will alternate between all connected nsqd. Will reconnect to a
* disconnected or failed nsqd on the next publish that could be routed to that nsqd after the failoverDuration has
* expired (Default 5 min).
* <p>
* This will throw an NSQD exception if all nsqd are in a failed state.
*
* @param nsqd a list of strings that represent HostAndPort objects.
*/
public static BiFunction<Client, Publisher, BalanceStrategy> getRoundRobinStrategyBuilder(List<String> nsqd) {
return (c, p) -> buildRoundRobinStrategy(c, p, nsqd);
}

/**
* Create a list based failover strategy that shows strong preference to the first nsqd on the list.
* <p>
* On publish, find the first nsqd in this list that is in a connected or connectable state. A nsqd is connectable
* if it has previously failed more than the configured failoverDuration (Default 5 min).
* <p>
* This will throw an NSQD exception if all nsqd are in a failed state.
*
* @param nsqd a list of strings that represent HostAndPort objects.
*/
public static BiFunction<Client, Publisher, BalanceStrategy> getFailoverStrategyBuilder(List<String> nsqd) {
return (c, p) -> buildFailoverStrategy(c,p,nsqd);
}

private static ListBasedBalanceStrategy buildRoundRobinStrategy(Client client, Publisher parent, List<String> nsqd) {
return new ListBasedBalanceStrategy(client, parent, nsqd, new Function<List<ConnectionDetails>, ConnectionDetails>() {
private int nextDaemonIndex = 0;

@Override
public ConnectionDetails apply(List<ConnectionDetails> daemonList) {
for (int attempts = 0; attempts < daemonList.size(); attempts++) {
ConnectionDetails candidate = daemonList.get(nextDaemonIndex);
boolean candidateReady = candidate.makeReady();
nextDaemonIndex++;
if (nextDaemonIndex >= daemonList.size()) {
nextDaemonIndex = 0;
}
if (candidateReady) {
return candidate;
}
}
throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList);
}
});
}

private static ListBasedBalanceStrategy buildFailoverStrategy(Client client, Publisher parent, List<String> nsqd) {
return new ListBasedBalanceStrategy(client, parent, nsqd, daemonList -> {
for (int attempts = 0; attempts < daemonList.size(); attempts++) {
ConnectionDetails candidate = daemonList.get(attempts);
if (candidate.makeReady()) {
return candidate;
}
}
throw new NSQException("publish failed: Unable to establish a connection with any NSQ host: " + daemonList);
});
}

public ListBasedBalanceStrategy(Client client, Publisher parent, List<String> nsqd, Function<List<ConnectionDetails>, ConnectionDetails> connectionDetailsSelector) {
super(client);
checkNotNull(parent);
checkNotNull(nsqd);
checkNotNull(connectionDetailsSelector);

this.parent = parent;
this.connectionDetailsSelector = connectionDetailsSelector;
List<ConnectionDetails> connectionDetails = new ArrayList<>();
for (String host : nsqd) {
if (host != null)
connectionDetails.add(new ConnectionDetails(host, this.parent, this.failoverDurationSecs, this));
}
daemonList = Collections.unmodifiableList(connectionDetails);
}

@Override
public ConnectionDetails getConnectionDetails() {
return connectionDetailsSelector.apply(daemonList);
}

@Override
public synchronized void connectionClosed(PubConnection closedCon) {
for (ConnectionDetails daemon : daemonList) {
if (daemon.getCon() == closedCon) {
daemon.clearConnection();
logger.debug("removed closed publisher connection:{}", closedCon.getHost());
}
}
}

@Override
public int getFailoverDurationSecs() {
return this.failoverDurationSecs;
}

@Override
public void setFailoverDurationSecs(int failoverDurationSecs) {
this.failoverDurationSecs = failoverDurationSecs;
for (ConnectionDetails connectionDetails : daemonList) {
connectionDetails.setFailoverDurationSecs(failoverDurationSecs);
}
}


@Override
public String toString() {
return this.getClass().getSimpleName() + "{" + "daemonList=" + daemonList + ", failoverDurationSecs=" + failoverDurationSecs + '}';
}
}
Loading