Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] appoint leader during runTime #1635

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 65 additions & 13 deletions aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import io.aeron.archive.client.AeronArchive;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.AdminRequestDecoder;
import io.aeron.cluster.codecs.AdminRequestType;
import io.aeron.cluster.codecs.BackupQueryDecoder;
import io.aeron.cluster.codecs.HeartbeatRequestDecoder;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.StandbySnapshotDecoder;
import io.aeron.cluster.codecs.*;
import io.aeron.cluster.codecs.mark.ClusterComponentType;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.ClusterCounters;
Expand All @@ -40,11 +35,7 @@
import io.aeron.driver.status.DutyCycleStallTracker;
import io.aeron.exceptions.ConcurrentConcludeException;
import io.aeron.exceptions.ConfigurationException;
import io.aeron.security.Authenticator;
import io.aeron.security.AuthenticatorSupplier;
import io.aeron.security.AuthorisationService;
import io.aeron.security.AuthorisationServiceSupplier;
import io.aeron.security.DefaultAuthenticatorSupplier;
import io.aeron.security.*;
import io.aeron.version.Versioned;
import org.agrona.*;
import org.agrona.concurrent.*;
Expand All @@ -69,7 +60,6 @@
import static io.aeron.CommonContext.*;
import static io.aeron.cluster.ConsensusModule.Configuration.CLUSTER_CLIENT_TIMEOUT_COUNT_TYPE_ID;
import static io.aeron.cluster.ConsensusModule.Configuration.CLUSTER_NODE_ROLE_TYPE_ID;
import static io.aeron.cluster.ConsensusModule.Configuration.COMMIT_POSITION_TYPE_ID;
import static io.aeron.cluster.ConsensusModule.Configuration.*;
import static org.agrona.BitUtil.findNextPositivePowerOfTwo;
import static org.agrona.SystemUtil.*;
Expand Down Expand Up @@ -130,7 +120,12 @@ public enum State
/**
* Terminal state.
*/
CLOSED(6);
CLOSED(6),

/**
* Appoint new leader wait election.
*/
APPOINT_LEADER(7);

static final State[] STATES = values();

Expand Down Expand Up @@ -324,6 +319,7 @@ public AgentInvoker conductorAgentInvoker()
/**
* {@inheritDoc}
*/
@Override
public void close()
{
CloseHelper.closeAll(conductorRunner, conductorInvoker);
Expand Down Expand Up @@ -955,6 +951,17 @@ public static final class Configuration
public static final String CONSENSUS_MODULE_EXTENSION_CLASS_NAME_PROP_NAME =
"aeron.cluster.consensus.module.extension";

/**
* Timeout for appointed leader status.
*/
public static final String APPOINTED_LEADER_TIMEOUT_PROP_NAME = "aeron.cluster.appointedLeader.timeout";

/**
* Timeout for appointed leader status.
*/
public static final long APPOINTED_LEADER_TIMEOUT_DEFAULT_NS = TimeUnit.SECONDS.toNanos(5);


/**
* The value {@link #CLUSTER_INGRESS_FRAGMENT_LIMIT_DEFAULT} or system property
* {@link #CLUSTER_INGRESS_FRAGMENT_LIMIT_PROP_NAME} if set.
Expand Down Expand Up @@ -1439,6 +1446,18 @@ public static boolean acceptStandbySnapshots()
return Boolean.getBoolean(CLUSTER_ACCEPT_STANDBY_SNAPSHOTS_PROP_NAME);
}

/**
* Timeout for appointed leader status.
*
* @return timeout in nanoseconds to wait for canvass request
* @see #SESSION_TIMEOUT_PROP_NAME
*/
public static long appointedLeaderTimeoutNs()
{
return getDurationInNanos(APPOINTED_LEADER_TIMEOUT_PROP_NAME, APPOINTED_LEADER_TIMEOUT_DEFAULT_NS);
}


/**
* Create a new {@link ConsensusModuleExtension} based on the configured
* {@link #CONSENSUS_MODULE_EXTENSION_CLASS_NAME_PROP_NAME}.
Expand Down Expand Up @@ -1474,6 +1493,7 @@ public static ConsensusModuleExtension newConsensusModuleExtension()
public static final class Context implements Cloneable
{
private static final VarHandle IS_CONCLUDED_VH;

static
{
try
Expand Down Expand Up @@ -1586,11 +1606,14 @@ public static final class Context implements Cloneable
private ConsensusModuleStateExport boostrapState = null;
private boolean acceptStandbySnapshots = Configuration.acceptStandbySnapshots();

private long appointedLeaderTimeoutNs = Configuration.appointedLeaderTimeoutNs();

/**
* Perform a shallow copy of the object.
*
* @return a shallow copy of the object.
*/
@Override
public Context clone()
{
try
Expand Down Expand Up @@ -4295,6 +4318,33 @@ public Context leadershipTermIdCounter(final Counter leadershipTermId)
return this;
}

/**
* Timeout for appointed leader status.
*
* @param appointedLeaderTimeoutNs to wait for canvass request in appointed leader statuis.
* @see Configuration#APPOINTED_LEADER_TIMEOUT_PROP_NAME
* @see Configuration#APPOINTED_LEADER_TIMEOUT_DEFAULT_NS
* @return this for a fluentAPI.
*/
public Context appointedLeaderTimeoutNs(final long appointedLeaderTimeoutNs)
{
this.appointedLeaderTimeoutNs = appointedLeaderTimeoutNs;
return this;
}

/**
* Timeout for appointed leader status.
*
* @return appointedLeaderTimeoutNs to wait for canvass request in appointed leader statuis.
* @see Configuration#APPOINTED_LEADER_TIMEOUT_PROP_NAME
* @see Configuration#APPOINTED_LEADER_TIMEOUT_DEFAULT_NS
*/
public long appointedLeaderTimeoutNs()
{
return appointedLeaderTimeoutNs;
}


/**
* Delete the cluster directory.
*/
Expand Down Expand Up @@ -4453,6 +4503,7 @@ private void validateLogChannel()
/**
* {@inheritDoc}
*/
@Override
public String toString()
{
return "ConsensusModule.Context" +
Expand Down Expand Up @@ -4546,6 +4597,7 @@ public String toString()
/**
* {@inheritDoc}
*/
@Override
public String toString()
{
return "ConsensusModule{" +
Expand Down
Loading