Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat(security): add negotiation class (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Sep 14, 2020
1 parent ed7d4c7 commit 1b602ba
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 14 deletions.
38 changes: 36 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* .falconPerfCounterTags("")
* .falconPushInterval(Duration.ofSeconds(10))
* .metaQueryTimeout(Duration.ofMillis(5000))
* .enableAuth(false)
* .build();
* }</pre>
*/
Expand All @@ -44,6 +45,7 @@ public class ClientOptions {
public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth";

public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
Expand All @@ -54,6 +56,7 @@ public class ClientOptions {
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
public static final boolean DEFAULT_ENABLE_AUTH = false;

private final String metaServers;
private final Duration operationTimeout;
Expand All @@ -63,6 +66,7 @@ public class ClientOptions {
private final Duration falconPushInterval;
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
private final boolean enableAuth;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -73,6 +77,7 @@ protected ClientOptions(Builder builder) {
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
this.enableAuth = builder.enableAuth;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -84,6 +89,7 @@ protected ClientOptions(ClientOptions original) {
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.enableAuth = original.enableAuth;
}

/**
Expand Down Expand Up @@ -143,6 +149,7 @@ public static ClientOptions create(String configPath) throws PException {
Duration metaQueryTimeout =
Duration.ofMillis(
config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis()));
boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH);

return ClientOptions.builder()
.metaServers(metaList)
Expand All @@ -152,6 +159,7 @@ public static ClientOptions create(String configPath) throws PException {
.falconPerfCounterTags(perfCounterTags)
.falconPushInterval(pushIntervalSecs)
.metaQueryTimeout(metaQueryTimeout)
.enableAuth(enableAuth)
.build();
}

Expand All @@ -169,7 +177,8 @@ public boolean equals(Object options) {
&& this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis();
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis()
&& this.enableAuth == clientOptions.enableAuth;
}
return false;
}
Expand All @@ -195,6 +204,8 @@ public String toString() {
+ enableWriteLimit
+ ", metaQueryTimeout(ms)="
+ metaQueryTimeout.toMillis()
+ ", enableAuth="
+ enableAuth
+ '}';
}

Expand All @@ -208,6 +219,7 @@ public static class Builder {
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private boolean enableAuth = DEFAULT_ENABLE_AUTH;

protected Builder() {}

Expand Down Expand Up @@ -310,6 +322,18 @@ public Builder metaQueryTimeout(Duration metaQueryTimeout) {
return this;
}

/**
* Whether to enable authentication. Defaults to {@literal false}, see {@link
* #DEFAULT_ENABLE_AUTH}.
*
* @param enableAuth
* @return {@code this}
*/
public Builder enableAuth(boolean enableAuth) {
this.enableAuth = enableAuth;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -337,7 +361,8 @@ public ClientOptions.Builder mutate() {
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout());
.metaQueryTimeout(getMetaQueryTimeout())
.enableAuth(isEnableAuth());
return builder;
}

Expand Down Expand Up @@ -417,4 +442,13 @@ public boolean isWriteLimitEnabled() {
public Duration getMetaQueryTimeout() {
return metaQueryTimeout;
}

/**
* Whether to enable authentication. Defaults to {@literal false}.
*
* @return whether to enable authentication.
*/
public boolean isEnableAuth() {
return enableAuth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ClusterManager extends Cluster {
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
private boolean enableAuth;

private static final String osName;

Expand Down Expand Up @@ -61,6 +62,8 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {
// so the replicaSessions should be initialized earlier
metaSession =
new MetaSession(this, metaList, (int) opts.getMetaQueryTimeout().toMillis(), 10, metaGroup);

this.enableAuth = opts.isEnableAuth();
}

public EventExecutor getExecutor() {
Expand All @@ -82,7 +85,10 @@ public ReplicaSession getReplicaSession(rpc_address address) {
if (ss != null) return ss;
ss =
new ReplicaSession(
address, replicaGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT));
address,
replicaGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
enableAuth);
replicaSessions.put(address, ss);
return ss;
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.xiaomi.infra.pegasus.rpc.async;

public class Negotiation {
public Negotiation(ReplicaSession session) {
this.session = session;
}

public void start() {
// TBD(zlw)
}

private ReplicaSession session;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public enum ConnState {
DISCONNECTED
}

public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout) {
public ReplicaSession(
rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean enableAuth) {
this.address = address;
this.rpcGroup = rpcGroup;
this.enableAuth = enableAuth;

final ReplicaSession this_ = this;
boot = new Bootstrap();
Expand Down Expand Up @@ -72,7 +74,7 @@ public ReplicaSession(
EventLoopGroup rpcGroup,
int socketTimeout,
MessageResponseFilter filter) {
this(address, rpcGroup, socketTimeout);
this(address, rpcGroup, socketTimeout, false);
this.filter = filter;
}

Expand Down Expand Up @@ -207,6 +209,17 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
}

private void startNegotiation(Channel activeChannel) {
logger.info("{}: mark session state negotiation");
if (enableAuth) {
negotiation = new Negotiation(this);
negotiation.start();
} else {
logger.info("{}: mark session state connected");
markSessionConnected(activeChannel);
}
}

private void markSessionConnected(Channel activeChannel) {
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.CONNECTED;
Expand Down Expand Up @@ -368,7 +381,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel {} for session {} is active", ctx.channel().toString(), name());
markSessionConnected(ctx.channel());
startNegotiation(ctx.channel());
}

@Override
Expand Down Expand Up @@ -421,6 +434,8 @@ static final class VolatileFields {
private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private boolean enableAuth;
private Negotiation negotiation;

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManager;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,7 +51,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private InterceptorManger interceptorManger;
private InterceptorManager interceptorManager;

public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
Expand Down Expand Up @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, InternalTableOptions intern
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions());
this.interceptorManager = new InterceptorManager(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down Expand Up @@ -255,7 +255,7 @@ public void onRpcReply(ClientRequestRound round, long cachedConfigVersion, Strin
}

client_operator operator = round.getOperator();
interceptorManger.after(round, operator.rpc_error.errno, this);
interceptorManager.after(round, operator.rpc_error.errno, this);
boolean needQueryMeta = false;
switch (operator.rpc_error.errno) {
case ERR_OK:
Expand Down Expand Up @@ -363,7 +363,7 @@ void call(final ClientRequestRound round) {
tableConfig.replicas.get(round.getOperator().get_gpid().get_pidx());

if (handle.primarySession != null) {
interceptorManger.before(round, this);
interceptorManager.before(round, this);
// send request to primary
handle.primarySession.asyncSend(
round.getOperator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import java.util.ArrayList;
import java.util.List;

public class InterceptorManger {
public class InterceptorManager {

private List<TableInterceptor> interceptors = new ArrayList<>();

public InterceptorManger(TableOptions options) {
public InterceptorManager(TableOptions options) {
if (options.enableBackupRequest()) {
interceptors.add(new BackupRequestInterceptor(options.backupRequestDelayMs()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void testSessionConnectTimeout() throws InterruptedException {

long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
ReplicaSession rs = new ReplicaSession(addr, rpcGroup, 1000);
ReplicaSession rs = new ReplicaSession(addr, rpcGroup, 1000, false);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();
Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Expand Down

0 comments on commit 1b602ba

Please sign in to comment.