Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent-hash based broadcaster #26

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
174 changes: 73 additions & 101 deletions rapid/src/main/java/com/vrg/rapid/Cluster.java

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions rapid/src/main/java/com/vrg/rapid/ConsistentHash.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.vrg.rapid;

import com.vrg.rapid.pb.Endpoint;
import net.openhft.hashing.LongHashFunction;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* Simple consistent Hash implementation, adapted for Endpoints
*
* https://tom-e-white.com/2007/11/consistent-hashing.html
*/
public class ConsistentHash<T> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for adding another consistent hash based ring implementation? Wouldn't any of the K rings in the MembershipView data structure suffice?


private final LongHashFunction hashFunction = LongHashFunction.xx();
private final int numberOfReplicas;
private final SortedMap<Long, Endpoint> circle = new TreeMap<>();

public ConsistentHash(final int numberOfReplicas, final Collection<Endpoint> nodes) {
this.numberOfReplicas = numberOfReplicas;

for (final Endpoint node : nodes) {
add(node);
}
}

public void add(final Endpoint node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.put(hash(node, i), node);
}
}

public void remove(final Endpoint node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hash(node, i));
}
}

public boolean isEmpty() {
return circle.isEmpty();
}

@Nullable
public Endpoint get(final Endpoint key) {
if (circle.isEmpty()) {
return null;
}
long hash = hash(key, 0);
if (!circle.containsKey(hash)) {
final SortedMap<Long, Endpoint> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}

private long hash(final Endpoint node, final int index) {
return hashFunction.hashBytes(node.getHostname().asReadOnlyByteBuffer()) * 31
+ hashFunction.hashInt(node.getPort()) + hashFunction.hashInt(index);
}

}
364 changes: 364 additions & 0 deletions rapid/src/main/java/com/vrg/rapid/ConsistentHashBroadcaster.java

Large diffs are not rendered by default.

102 changes: 68 additions & 34 deletions rapid/src/main/java/com/vrg/rapid/FastPaxos.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@

package com.vrg.rapid;

import com.google.protobuf.TextFormat;
import com.google.protobuf.ByteString;
import com.vrg.rapid.messaging.IBroadcaster;
import com.vrg.rapid.messaging.IMessagingClient;
import com.vrg.rapid.pb.ConsensusResponse;
import com.vrg.rapid.pb.Endpoint;
import com.vrg.rapid.pb.FastRoundPhase2bMessage;
import com.vrg.rapid.pb.Proposal;
import com.vrg.rapid.pb.RapidRequest;
import com.vrg.rapid.pb.RapidResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -47,7 +49,7 @@ class FastPaxos {
private final double jitterRate;
private final Endpoint myAddr;
private final long configurationId;
private final long membershipSize;
private final List<Endpoint> memberList;
private final Consumer<List<Endpoint>> onDecidedWrapped;
private final IBroadcaster broadcaster;
private final Map<List<Endpoint>, AtomicInteger> votesPerProposal = new HashMap<>();
Expand All @@ -59,21 +61,21 @@ class FastPaxos {
@Nullable private ScheduledFuture<?> scheduledClassicRoundTask = null;
private final ISettings settings;

FastPaxos(final Endpoint myAddr, final long configurationId, final int membershipSize,
FastPaxos(final Endpoint myAddr, final long configurationId, final List<Endpoint> memberList,
final IMessagingClient client, final IBroadcaster broadcaster,
final ScheduledExecutorService scheduledExecutorService, final Consumer<List<Endpoint>> onDecide,
final ISettings settings) {
this.myAddr = myAddr;
this.configurationId = configurationId;
this.membershipSize = membershipSize;
this.memberList = memberList;
this.broadcaster = broadcaster;
this.settings = settings;

// The rate of a random expovariate variable, used to determine a jitter over a base delay to start classic
// rounds. This determines how many classic rounds we want to start per second on average. Does not
// affect correctness of the protocol, but having too many nodes starting rounds will increase messaging load,
// especially for very large clusters.
this.jitterRate = 1 / (double) membershipSize;
this.jitterRate = 1 / (double) this.memberList.size();
this.scheduledExecutorService = scheduledExecutorService;
this.onDecidedWrapped = hosts -> {
assert !decided.get();
Expand All @@ -83,7 +85,7 @@ class FastPaxos {
}
onDecide.accept(hosts);
};
this.paxos = new Paxos(myAddr, configurationId, membershipSize, client, broadcaster, onDecidedWrapped);
this.paxos = new Paxos(myAddr, configurationId, this.memberList.size(), client, broadcaster, onDecidedWrapped);
}

/**
Expand All @@ -95,13 +97,19 @@ void propose(final List<Endpoint> proposal, final long recoveryDelayInMs) {
synchronized (paxosLock) {
paxos.registerFastRoundVote(proposal);
}
final FastRoundPhase2bMessage consensusMessage = FastRoundPhase2bMessage.newBuilder()
final int myIndex = memberList.indexOf(myAddr);
final BitSet votes = new BitSet(memberList.size());
votes.set(myIndex);
final Proposal proposalAndVotes = Proposal.newBuilder()
.setConfigurationId(configurationId)
.addAllEndpoints(proposal)
.setSender(myAddr)
.setVotes(ByteString.copyFrom(votes.toByteArray()))
.build();
final FastRoundPhase2bMessage consensusMessage = FastRoundPhase2bMessage.newBuilder()
.addProposals(proposalAndVotes)
.build();
final RapidRequest proposalMessage = Utils.toRapidRequest(consensusMessage);
broadcaster.broadcast(proposalMessage);
broadcaster.broadcast(proposalMessage, configurationId);
LOG.trace("Scheduling classic round with delay: {}", recoveryDelayInMs);
scheduledClassicRoundTask = scheduledExecutorService.schedule(this::startClassicPaxosRound, recoveryDelayInMs,
TimeUnit.MILLISECONDS);
Expand All @@ -123,36 +131,62 @@ void propose(final List<Endpoint> proposal) {
* @param proposalMessage the membership change proposal towards a configuration change.
*/
private void handleFastRoundProposal(final FastRoundPhase2bMessage proposalMessage) {
if (proposalMessage.getConfigurationId() != configurationId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Settings ID mismatch for proposal: current_config:{} proposal:{}", configurationId,
TextFormat.shortDebugString(proposalMessage));
}
if (decided.get()) {
return;
}

if (votesReceived.contains(proposalMessage.getSender())) {
return;
}
proposalMessage
.getProposalsList()
.stream()
.filter(proposal -> {
final boolean hasSameConfigurationId = proposal.getConfigurationId() == configurationId;
if (!hasSameConfigurationId) {
LOG.warn("Settings ID mismatch for proposal: current_config:{} proposal_config:{}" +
"proposal of size {}", configurationId, proposal.getConfigurationId(),
proposal.getEndpointsCount());
}
return hasSameConfigurationId;
}).forEach(proposal -> {
if (LOG.isTraceEnabled()) {
LOG.trace("Received proposal for {} nodes", proposal.getEndpointsCount());
}
// decompress all the votes contained in the proposal
// the index in the bitset corresponds to the index of an endpoint in ring 0
final AtomicInteger proposalsReceived = votesPerProposal.computeIfAbsent(
proposal.getEndpointsList(),
k -> new AtomicInteger(0));
final BitSet votes = BitSet.valueOf(proposal.getVotes().toByteArray());
for (int i = 0; i < votes.length(); i++) {
if (votes.get(i)) {
final Endpoint voter = memberList.get(i);
if (!votesReceived.contains(voter)) {
votesReceived.add(voter);
proposalsReceived.incrementAndGet();
}
}
}

if (decided.get()) {
return;
}
votesReceived.add(proposalMessage.getSender());
final AtomicInteger proposalsReceived = votesPerProposal.computeIfAbsent(proposalMessage.getEndpointsList(),
k -> new AtomicInteger(0));
final int count = proposalsReceived.incrementAndGet();
final int F = (int) Math.floor((membershipSize - 1) / 4.0); // Fast Paxos resiliency.
if (votesReceived.size() >= membershipSize - F) {
if (count >= membershipSize - F) {
LOG.trace("Decided on a view change: {}", proposalMessage.getEndpointsList());
// We have a successful proposal. Consume it.
onDecidedWrapped.accept(proposalMessage.getEndpointsList());
} else {
// fallback protocol here
LOG.trace("Fast round may not succeed for proposal: {}", proposalMessage.getEndpointsList());
// now check if we have reached agreement
final int count = votesPerProposal.get(proposal.getEndpointsList()).get();
LOG.trace("Currently {} votes for the proposal with {} nodes, {} votes in total",
count, proposal.getEndpointsCount(), votesReceived.size());
final int F = (int) Math.floor((memberList.size() - 1) / 4.0); // Fast Paxos resiliency.
if (votesReceived.size() >= memberList.size() - F) {
if (count >= memberList.size() - F) {
if (LOG.isTraceEnabled()) {
LOG.trace("Decided on a view change: {}", proposal.getEndpointsList());
}
// We have a successful proposal. Consume it.
onDecidedWrapped.accept(proposal.getEndpointsList());
} else {
// fallback protocol here
if (LOG.isTraceEnabled()) {
LOG.trace("Fast round may not succeed for proposal: {}", proposal.getEndpointsList());
}
}
}
}
});

}

/**
Expand Down
Loading