Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify join protocol #25

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 31 additions & 98 deletions rapid/src/main/java/com/vrg/rapid/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package com.vrg.rapid;

import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.vrg.rapid.messaging.IMessagingClient;
import com.vrg.rapid.messaging.IMessagingServer;
Expand All @@ -29,9 +27,7 @@
import com.vrg.rapid.pb.JoinStatusCode;
import com.vrg.rapid.pb.Metadata;
import com.vrg.rapid.pb.NodeId;
import com.vrg.rapid.pb.PreJoinMessage;
import com.vrg.rapid.pb.RapidRequest;
import com.vrg.rapid.pb.RapidResponse;
import io.grpc.ExperimentalApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -171,6 +166,7 @@ public static class Builder {
@Nullable private IMessagingClient messagingClient = null;
@Nullable private IMessagingServer messagingServer = null;
@Nullable private SharedResources sharedResources = null;
@Nullable private Endpoint seedAddress = null;

/**
* Instantiates a builder for a Rapid Cluster node that will listen on the given {@code listenAddress}
Expand Down Expand Up @@ -301,6 +297,11 @@ public Cluster join(final HostAndPort seedHostAndPort) throws IOException, Inter
* @throws IOException Thrown if we cannot successfully start a server
*/
Cluster join(final Endpoint seedAddress) throws IOException, InterruptedException {
if (this.seedAddress != null && !this.seedAddress.equals(seedAddress)) {
throw new JoinException("Cannot join another seed node while an attempt is in progress");
}
this.seedAddress = seedAddress;

NodeId currentIdentifier = Utils.nodeIdFromUUID(UUID.randomUUID());
sharedResources = new SharedResources(listenAddress);
messagingServer = messagingServer != null
Expand All @@ -310,134 +311,68 @@ Cluster join(final Endpoint seedAddress) throws IOException, InterruptedExceptio
? messagingClient
: new GrpcClient(listenAddress, sharedResources, settings);
messagingServer.start();

for (int attempt = 0; attempt < RETRIES; attempt++) {
try {
return joinAttempt(seedAddress, currentIdentifier, attempt);
} catch (final ExecutionException | JoinPhaseTwoException e) {
} catch (final ExecutionException e) {
LOG.error("Join message to seed {} returned an exception: {}", Utils.loggable(seedAddress), e);
} catch (final JoinPhaseOneException e) {
/*
* These are error responses from a seed node that warrant a retry.
*/
final JoinResponse result = e.getJoinPhaseOneResult();
switch (result.getStatusCode()) {
case CONFIG_CHANGED:
LOG.error("CONFIG_CHANGED received from {}. Retrying.", Utils.loggable(result.getSender()));
break;
case UUID_ALREADY_IN_RING:
LOG.error("UUID_ALREADY_IN_RING received from {}. Retrying.",
Utils.loggable(result.getSender()));
currentIdentifier = Utils.nodeIdFromUUID(UUID.randomUUID());
break;
case MEMBERSHIP_REJECTED:
case HOSTNAME_ALREADY_IN_RING:
LOG.error("Membership rejected by {}. Retrying.", Utils.loggable(result.getSender()));
break;
case VIEW_CHANGE_IN_PROGRESS:
LOG.info("Seed node {} is executing a view change. Retrying.",
Utils.loggable(result.getSender()));
break;
default:
this.seedAddress = null;
throw new JoinException("Unrecognized status code");
}
}
}
messagingServer.shutdown();
messagingClient.shutdown();
sharedResources.shutdown();
this.seedAddress = null;
throw new JoinException("Join attempt unsuccessful " + Utils.loggable(listenAddress));
}

/**
* A single attempt by a node to join a cluster. This includes phase one, where it contacts
* a seed node to receive a list of observers to contact and the configuration to join. If successful,
* it triggers phase two where it contacts those observers who then vouch for the joiner's admission
* into the cluster.
* A single attempt by a node to join a cluster.
*/
private Cluster joinAttempt(final Endpoint seedAddress, final NodeId currentIdentifier, final int attempt)
throws ExecutionException, InterruptedException {
assert messagingClient != null;
// First, get the configuration ID and the observers to contact from the seed node.
final RapidRequest preJoinMessage = Utils.toRapidRequest(PreJoinMessage.newBuilder()
.setSender(listenAddress)
.setNodeId(currentIdentifier)
.build());
final JoinResponse joinPhaseOneResult = messagingClient.sendMessage(seedAddress, preJoinMessage)
.get()
.getJoinResponse();

/*
* Either the seed node indicates it is safe to join, or it indicates that we're already
* part of the configuration (which happens due to a race condition where we retry a join
* after a timeout while the cluster has added us -- see below).
*/
if (joinPhaseOneResult.getStatusCode() != JoinStatusCode.SAFE_TO_JOIN
&& joinPhaseOneResult.getStatusCode() != JoinStatusCode.HOSTNAME_ALREADY_IN_RING) {
throw new JoinPhaseOneException(joinPhaseOneResult);
}

/*
* HOSTNAME_ALREADY_IN_RING is a special case. If the joinPhase2 request times out before
* the join confirmation arrives from an observer, a client may re-try a join by contacting
* the seed and get this response. It should simply get the configuration streamed to it.
* To do that, that client tries the join protocol but with a configuration id of -1.
*/
final long configurationToJoin = joinPhaseOneResult.getStatusCode()
== JoinStatusCode.HOSTNAME_ALREADY_IN_RING ? -1 : joinPhaseOneResult.getConfigurationId();
LOG.debug("{} is trying a join under configuration {} (attempt {})",
Utils.loggable(listenAddress), configurationToJoin, attempt);

/*
* Phase one complete. Now send a phase two message to all our observers, and if there is a valid
* response, construct a Cluster object based on it.
*/
final Optional<JoinResponse> response = sendJoinPhase2Messages(joinPhaseOneResult,
configurationToJoin, currentIdentifier)
.stream()
.filter(Objects::nonNull)
.map(RapidResponse::getJoinResponse)
.filter(r -> r.getStatusCode() == JoinStatusCode.SAFE_TO_JOIN)
.filter(r -> r.getConfigurationId() != configurationToJoin)
.findFirst();
if (response.isPresent()) {
return createClusterFromJoinResponse(response.get());
final RapidRequest joinMessage = Utils.toRapidRequest(JoinMessage.newBuilder()
.setSender(listenAddress)
.setNodeId(currentIdentifier)
.setMetadata(metadata)
.build());
final JoinResponse joinResponse = messagingClient.sendMessage(seedAddress, joinMessage)
.get()
.getJoinResponse();

if (joinResponse.getStatusCode() != JoinStatusCode.SAFE_TO_JOIN) {
throw new JoinPhaseOneException(joinResponse);
}
throw new JoinPhaseTwoException();
}

/**
* Identifies the set of observers to reach out to from the phase one message, and sends a join phase 2 message.
*/
private List<RapidResponse> sendJoinPhase2Messages(final JoinResponse joinPhaseOneResult,
final long configurationToJoin, final NodeId currentIdentifier)
throws ExecutionException, InterruptedException {
assert messagingClient != null;
// We have the list of observers. Now contact them as part of phase 2.
final List<Endpoint> observerList = joinPhaseOneResult.getEndpointsList();
final Map<Endpoint, List<Integer>> ringNumbersPerObserver = new HashMap<>(K);

// Batch together requests to the same node.
int ringNumber = 0;
for (final Endpoint observer: observerList) {
ringNumbersPerObserver.computeIfAbsent(observer, k -> new ArrayList<>()).add(ringNumber);
ringNumber++;
}

final List<ListenableFuture<RapidResponse>> responseFutures = new ArrayList<>();
for (final Map.Entry<Endpoint, List<Integer>> entry: ringNumbersPerObserver.entrySet()) {
final JoinMessage msg = JoinMessage.newBuilder()
.setSender(listenAddress)
.setNodeId(currentIdentifier)
.setMetadata(metadata)
.setConfigurationId(configurationToJoin)
.addAllRingNumber(entry.getValue()).build();
final RapidRequest request = Utils.toRapidRequest(msg);
LOG.info("{} is sending a join-p2 to {} for config {}",
Utils.loggable(listenAddress), Utils.loggable(entry.getKey()),
configurationToJoin);
final ListenableFuture<RapidResponse> call = messagingClient.sendMessage(entry.getKey(), request);
responseFutures.add(call);
}
return Futures.successfulAsList(responseFutures).get();
return createClusterFromJoinResponse(joinResponse);
}

/**
* We have a valid JoinPhase2Response. Use the retrieved configuration to construct and return a Cluster object.
* We have a valid JoinResponse. Use the retrieved configuration to construct and return a Cluster object.
*/
private Cluster createClusterFromJoinResponse(final JoinResponse response) {
assert messagingClient != null && messagingServer != null && sharedResources != null;
Expand Down Expand Up @@ -489,15 +424,13 @@ public static final class JoinException extends RuntimeException {
static final class JoinPhaseOneException extends RuntimeException {
final JoinResponse joinPhaseOneResult;

JoinPhaseOneException(final JoinResponse joinPhaseOneResult) {
this.joinPhaseOneResult = joinPhaseOneResult;
JoinPhaseOneException(final JoinResponse joinResult) {
this.joinPhaseOneResult = joinResult;
}

private JoinResponse getJoinPhaseOneResult() {
return joinPhaseOneResult;
}
}

static final class JoinPhaseTwoException extends RuntimeException {
}
}
Loading