Skip to content

Commit

Permalink
Add New Node Locator - Round Robin
Browse files Browse the repository at this point in the history
Adds a new round-robin based node locator. A round-robin node locator
is useful for a couple of usecases:

* Read-only applications that want to round-robin across nodes to
  spread load across memcached instances.
* Applications that are pointing to mcrouter endpoints vs. memcached
  endpoints. mcrouter is a memcached compliant protocol router for
  scaling memcached (see https://github.com/facebook/mcrouter).
  • Loading branch information
amcrn committed Nov 3, 2017
1 parent c232307 commit a50290f
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ public NodeLocator createLocator(List<MemcachedNode> nodes) {
return new ArrayModNodeLocator(nodes, getHashAlg());
case CONSISTENT:
return new KetamaNodeLocator(nodes, getHashAlg());
case ROUND_ROBIN:
return new RoundRobinLocator(nodes);
default:
throw new IllegalStateException("Unhandled locator type: " + locator);
}
Expand Down Expand Up @@ -493,6 +495,10 @@ public static enum Locator {
* algorithm.
*/
CONSISTENT,
/**
* Round robin algorithm.
*/
ROUND_ROBIN,
/**
* VBucket support.
*/
Expand Down
112 changes: 112 additions & 0 deletions src/main/java/net/spy/memcached/RoundRobinLocator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package net.spy.memcached;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import net.spy.memcached.MemcachedNode;
import net.spy.memcached.MemcachedNodeROImpl;
import net.spy.memcached.NodeLocator;


/**
* NodeLocator implementation that round-robins across active nodes.
*/
public class RoundRobinLocator implements NodeLocator {

private int nodeIndex;
private MemcachedNode[] nodes;

public RoundRobinLocator(List<MemcachedNode> n) {
super();
nodes = n.toArray(new MemcachedNode[n.size()]);
}

private RoundRobinLocator(MemcachedNode[] n) {
super();
nodes = n;
}

/**
* @param k Input key (which is ignored in round robin)
* @return Next active node. If none of the nodes are active, the next node in
* the list is returned. Never returns null.
*/
@Override
public synchronized MemcachedNode getPrimary(String k) {
int i;
for (i = nodeIndex; !nodes[i % nodes.length].isActive()
&& i < nodeIndex + nodes.length; i++) {}

nodeIndex = (i + 1) % nodes.length;
return nodes[i % nodes.length];
}

@Override
public Iterator<MemcachedNode> getSequence(String k) {
return new NodeIterator(nodeIndex);
}

@Override
public Collection<MemcachedNode> getAll() {
return Arrays.asList(nodes);
}

@Override
public NodeLocator getReadonlyCopy() {
MemcachedNode[] n = new MemcachedNode[nodes.length];
for (int i = 0; i < nodes.length; i++) {
n[i] = new MemcachedNodeROImpl(nodes[i]);
}
return new RoundRobinLocator(n);
}

@Override
public void updateLocator(List<MemcachedNode> newNodes) {
this.nodes = newNodes.toArray(new MemcachedNode[newNodes.size()]);
}

class NodeIterator implements Iterator<MemcachedNode> {

private final int start;
private int next = 0;

public NodeIterator(int keyStart) {
start = keyStart;
next = start;
computeNext();
assert next >= 0 || nodes.length == 1 : "Starting sequence at "
+ start + " of " + nodes.length + " next is " + next;
}

@Override
public boolean hasNext() {
return next >= 0;
}

private void computeNext() {
if (++next >= nodes.length) {
next = 0;
}
if (next == start) {
next = -1;
}
}

@Override
public MemcachedNode next() {
try {
return nodes[next];
} finally {
computeNext();
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Can't remove a node");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void runSequenceAssertion(NodeLocator l, String k, int... seq) {
assertEquals("Incorrect sequence size for " + k, seq.length, pos);
}

public final void testCloningGetPrimary() {
public void testCloningGetPrimary() {
setupNodes(5);
assertTrue(locator.getReadonlyCopy().getPrimary("hi")
instanceof MemcachedNodeROImpl);
Expand Down
121 changes: 121 additions & 0 deletions src/test/java/net/spy/memcached/RoundRobinLocatorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package net.spy.memcached;

import java.util.Arrays;
import java.util.Collection;


/**
* Test the RoundRobinLocatorTest.
*/
public class RoundRobinLocatorTest extends AbstractNodeLocationCase {

private void setActive(boolean value, int ... nodes) {
for (int n : nodes) {
nodeMocks[n].expects(atLeastOnce()).method("isActive")
.will(returnValue(value));
}
}

@Override
protected void setupNodes(int n) {
super.setupNodes(n);
locator = new RoundRobinLocator(Arrays.asList(nodes));
}

public void testPrimarySingleNodeActive() throws Exception {
setupNodes(1);
setActive(true, 0);
assertSame(nodes[0], locator.getPrimary("a"));
assertSame(nodes[0], locator.getPrimary("b"));
assertSame(nodes[0], locator.getPrimary("c"));
}

public void testPrimarySingleNodeDown() throws Exception {
setupNodes(1);
setActive(false, 0);
assertSame(nodes[0], locator.getPrimary("a"));
assertSame(nodes[0], locator.getPrimary("b"));
assertSame(nodes[0], locator.getPrimary("c"));
}

public void testPrimaryMultiNodeOneDown() throws Exception {
setupNodes(2);
setActive(false, 0);
setActive(true, 1);
assertSame(nodes[1], locator.getPrimary("a"));
assertSame(nodes[1], locator.getPrimary("b"));
assertSame(nodes[1], locator.getPrimary("c"));
}

public void testPrimaryMultiMixedDown() throws Exception {
setupNodes(4);
setActive(false, 0, 2);
setActive(true, 1, 3);
assertSame(nodes[1], locator.getPrimary("a"));
assertSame(nodes[3], locator.getPrimary("b"));
assertSame(nodes[1], locator.getPrimary("c"));
assertSame(nodes[3], locator.getPrimary("a"));
}

public void testPrimaryMultiNodeAllActive() throws Exception {
setupNodes(4);
setActive(true, 0, 1, 2, 3);
assertSame(nodes[0], locator.getPrimary("a"));
assertSame(nodes[1], locator.getPrimary("b"));
assertSame(nodes[2], locator.getPrimary("c"));
assertSame(nodes[3], locator.getPrimary("d"));
assertSame(nodes[0], locator.getPrimary("e"));
assertSame(nodes[1], locator.getPrimary("f"));
assertSame(nodes[2], locator.getPrimary("g"));
assertSame(nodes[3], locator.getPrimary("h"));
assertSame(nodes[0], locator.getPrimary("i"));
}

public void testPrimaryMultiNodeAllDown() throws Exception {
setupNodes(4);
setActive(false, 0, 1, 2, 3);
assertSame(nodes[0], locator.getPrimary("a"));
assertSame(nodes[1], locator.getPrimary("b"));
assertSame(nodes[2], locator.getPrimary("c"));
assertSame(nodes[3], locator.getPrimary("d"));
assertSame(nodes[0], locator.getPrimary("e"));
assertSame(nodes[1], locator.getPrimary("f"));
assertSame(nodes[2], locator.getPrimary("g"));
assertSame(nodes[3], locator.getPrimary("h"));
assertSame(nodes[0], locator.getPrimary("i"));
}

public void testPrimaryClone() throws Exception {
setupNodes(2);
setActive(true, 0);
assertEquals(nodes[0].toString(),
locator.getReadonlyCopy().getPrimary("a").toString());
assertEquals(nodes[0].toString(),
locator.getReadonlyCopy().getPrimary("b").toString());
}

public void testAll() throws Exception {
setupNodes(4);
Collection<MemcachedNode> all = locator.getAll();
assertEquals(4, all.size());
assertTrue(all.contains(nodes[0]));
assertTrue(all.contains(nodes[1]));
assertTrue(all.contains(nodes[2]));
assertTrue(all.contains(nodes[3]));
}

public void testAllClone() throws Exception {
setupNodes(4);
Collection<MemcachedNode> all = locator.getReadonlyCopy().getAll();
assertEquals(4, all.size());
}

@Override
public final void testCloningGetPrimary() {
setupNodes(5);
setActive(true, 0);
assertTrue(locator.getReadonlyCopy().getPrimary("hi")
instanceof MemcachedNodeROImpl);
}

}

0 comments on commit a50290f

Please sign in to comment.