Skip to content

Commit

Permalink
refactor: reduce parameters in construction of internal Table and Clu…
Browse files Browse the repository at this point in the history
…ster (apache#104)
  • Loading branch information
Wu Tao authored May 12, 2020
1 parent 1f746ae commit 6a76f58
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 128 deletions.
23 changes: 9 additions & 14 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.KeyHasher;
import com.xiaomi.infra.pegasus.rpc.*;
import com.xiaomi.infra.pegasus.tools.Tools;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -56,9 +58,9 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws
table = tableMap.get(tableName);
if (table == null) {
try {
table =
new PegasusTable(
this, cluster.openTable(tableName, new PegasusHasher(), backupRequestDelayMs));
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
Table internalTable = cluster.openTable(tableName, options);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
throw new PException(e);
}
Expand All @@ -71,14 +73,7 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws

// pegasus client configuration keys
public static final String[] PEGASUS_CLIENT_CONFIG_KEYS =
new String[] {
Cluster.PEGASUS_META_SERVERS_KEY,
Cluster.PEGASUS_OPERATION_TIMEOUT_KEY,
Cluster.PEGASUS_ASYNC_WORKERS_KEY,
Cluster.PEGASUS_ENABLE_PERF_COUNTER_KEY,
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY,
PEGASUS_ENABLE_WRITE_LIMIT
};
ArrayUtils.add(ClusterOptions.allKeys(), PEGASUS_ENABLE_WRITE_LIMIT);

// configPath could be:
// - zk path: zk://host1:port1,host2:port2,host3:port3/path/to/config
Expand Down
55 changes: 2 additions & 53 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,14 @@
import org.apache.thrift.TException;

public abstract class Cluster {
public static final int MIN_SOCK_CONNECT_TIMEOUT = 1000;

public static final String PEGASUS_META_SERVERS_KEY = "meta_servers";

public static final String PEGASUS_OPERATION_TIMEOUT_KEY = "operation_timeout";
public static final String PEGASUS_OPERATION_TIMEOUT_DEF = "1000";

public static final String PEGASUS_ASYNC_WORKERS_KEY = "async_workers";
public static final String PEGASUS_ASYNC_WORKERS_DEF =
String.valueOf(Runtime.getRuntime().availableProcessors());

public static final String PEGASUS_ENABLE_PERF_COUNTER_KEY = "enable_perf_counter";
public static final String PEGASUS_ENABLE_PERF_COUNTER_VALUE = "false";

public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags";
public static final String PEGASUS_PERF_COUNTER_TAGS_DEF = "";

public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60";

public static Cluster createCluster(Properties config) throws IllegalArgumentException {
int operatorTimeout =
Integer.parseInt(
config.getProperty(PEGASUS_OPERATION_TIMEOUT_KEY, PEGASUS_OPERATION_TIMEOUT_DEF));
String metaList = config.getProperty(PEGASUS_META_SERVERS_KEY);
if (metaList == null) {
throw new IllegalArgumentException("no property set: " + PEGASUS_META_SERVERS_KEY);
}
metaList = metaList.trim();
if (metaList.isEmpty()) {
throw new IllegalArgumentException("invalid property: " + PEGASUS_META_SERVERS_KEY);
}
String[] address = metaList.split(",");

int asyncWorkers =
Integer.parseInt(config.getProperty(PEGASUS_ASYNC_WORKERS_KEY, PEGASUS_ASYNC_WORKERS_DEF));
boolean enablePerfCounter =
Boolean.parseBoolean(
config.getProperty(PEGASUS_ENABLE_PERF_COUNTER_KEY, PEGASUS_ENABLE_PERF_COUNTER_VALUE));
String perfCounterTags =
enablePerfCounter
? config.getProperty(PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF)
: null;
int pushIntervalSecs =
Integer.parseInt(
config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF));
return new ClusterManager(
operatorTimeout,
asyncWorkers,
enablePerfCounter,
perfCounterTags,
pushIntervalSecs,
address);
return new ClusterManager(ClusterOptions.create(config));
}

public abstract String[] getMetaList();

public abstract Table openTable(String name, KeyHasher function, int backupRequestDelayMs)
public abstract Table openTable(String name, TableOptions options)
throws ReplicationException, TException;

public abstract void close();
Expand Down
127 changes: 127 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc;

import java.util.Properties;

/** ClusterOptions is the internal options for connecting a Pegasus cluster. */
public class ClusterOptions {
public static final int MIN_SOCK_CONNECT_TIMEOUT = 1000;

public static final String PEGASUS_META_SERVERS_KEY = "meta_servers";

public static final String PEGASUS_OPERATION_TIMEOUT_KEY = "operation_timeout";
public static final String PEGASUS_OPERATION_TIMEOUT_DEF = "1000";

public static final String PEGASUS_ASYNC_WORKERS_KEY = "async_workers";
public static final String PEGASUS_ASYNC_WORKERS_DEF =
String.valueOf(Runtime.getRuntime().availableProcessors());

public static final String PEGASUS_ENABLE_PERF_COUNTER_KEY = "enable_perf_counter";
public static final String PEGASUS_ENABLE_PERF_COUNTER_DEF = "false";

public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags";
public static final String PEGASUS_PERF_COUNTER_TAGS_DEF = "";

public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60";

public static String[] allKeys() {
return new String[] {
PEGASUS_META_SERVERS_KEY,
PEGASUS_OPERATION_TIMEOUT_KEY,
PEGASUS_ASYNC_WORKERS_KEY,
PEGASUS_ENABLE_PERF_COUNTER_KEY,
PEGASUS_PERF_COUNTER_TAGS_KEY,
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY
};
}

private final int operationTimeout;
private final String[] metaList;
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String perfCounterTags;
private final int pushCounterIntervalSecs;

public int operationTimeout() {
return this.operationTimeout;
}

public String[] metaList() {
return this.metaList == null ? null : this.metaList.clone();
}

public int asyncWorkers() {
return this.asyncWorkers;
}

public boolean enablePerfCounter() {
return this.enablePerfCounter;
}

public String perfCounterTags() {
return this.perfCounterTags;
}

public int pushCounterIntervalSecs() {
return this.pushCounterIntervalSecs;
}

public static ClusterOptions create(Properties config) {
int operationTimeout =
Integer.parseInt(
config.getProperty(PEGASUS_OPERATION_TIMEOUT_KEY, PEGASUS_OPERATION_TIMEOUT_DEF));
String metaList = config.getProperty(PEGASUS_META_SERVERS_KEY);
if (metaList == null) {
throw new IllegalArgumentException("no property set: " + PEGASUS_META_SERVERS_KEY);
}
metaList = metaList.trim();
if (metaList.isEmpty()) {
throw new IllegalArgumentException("invalid property: " + PEGASUS_META_SERVERS_KEY);
}
String[] address = metaList.split(",");

int asyncWorkers =
Integer.parseInt(config.getProperty(PEGASUS_ASYNC_WORKERS_KEY, PEGASUS_ASYNC_WORKERS_DEF));
boolean enablePerfCounter =
Boolean.parseBoolean(
config.getProperty(PEGASUS_ENABLE_PERF_COUNTER_KEY, PEGASUS_ENABLE_PERF_COUNTER_DEF));
String perfCounterTags =
enablePerfCounter
? config.getProperty(PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF)
: null;
int pushIntervalSecs =
Integer.parseInt(
config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF));

return new ClusterOptions(
operationTimeout,
address,
asyncWorkers,
enablePerfCounter,
perfCounterTags,
pushIntervalSecs);
}

public static ClusterOptions forTest(String[] metaList) {
return new ClusterOptions(1000, metaList, 1, false, null, 60);
}

private ClusterOptions(
int operationTimeout,
String[] metaList,
int asyncWorkers,
boolean enablePerfCounter,
String perfCounterTags,
int pushCounterIntervalSecs) {
this.operationTimeout = operationTimeout;
this.metaList = metaList;
this.asyncWorkers = asyncWorkers;
this.enablePerfCounter = enablePerfCounter;
this.perfCounterTags = perfCounterTags;
this.pushCounterIntervalSecs = pushCounterIntervalSecs;
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc;

/** TableOptions is the internal options for opening a Pegasus table. */
public class TableOptions {
private final KeyHasher keyHasher;
private final int backupRequestDelayMs;

public KeyHasher keyHasher() {
return this.keyHasher;
}

public int backupRequestDelayMs() {
return this.backupRequestDelayMs;
}

public static TableOptions forTest() {
return new TableOptions(KeyHasher.DEFAULT, 0);
}

public TableOptions(KeyHasher h, int backupRequestDelay) {
this.keyHasher = h;
this.backupRequestDelayMs = backupRequestDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc.async;

import static java.lang.Math.max;
import static java.lang.Integer.max;

import com.xiaomi.infra.pegasus.base.rpc_address;
import com.xiaomi.infra.pegasus.metrics.MetricsManager;
import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.KeyHasher;
import com.xiaomi.infra.pegasus.rpc.ClusterOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand All @@ -20,7 +21,6 @@
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

/** Created by [email protected] on 16-11-11. */
public class ClusterManager extends Cluster {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterManager.class);

Expand All @@ -43,32 +43,25 @@ public class ClusterManager extends Cluster {
logger.info("operating system name: {}", osName);
}

public ClusterManager(
int timeout,
int io_threads,
boolean enableCounter,
String perfCounterTags,
int pushIntervalSecs,
String[] address_list)
throws IllegalArgumentException {
setTimeout(timeout);
this.enableCounter = enableCounter;
public ClusterManager(ClusterOptions opts) throws IllegalArgumentException {
setTimeout(opts.operationTimeout());
this.enableCounter = opts.enablePerfCounter();
if (enableCounter) {
MetricsManager.detectHostAndInit(perfCounterTags, pushIntervalSecs);
MetricsManager.detectHostAndInit(opts.perfCounterTags(), opts.pushCounterIntervalSecs());
}

replicaSessions = new ConcurrentHashMap<rpc_address, ReplicaSession>();
replicaGroup = getEventLoopGroupInstance(io_threads);
replicaGroup = getEventLoopGroupInstance(opts.asyncWorkers());
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);

metaList = address_list;
metaList = opts.metaList();
// the constructor of meta session is depend on the replicaSessions,
// so the replicaSessions should be initialized earlier
metaSession = new MetaSession(this, address_list, timeout, 10, metaGroup);
metaSession = new MetaSession(this, opts.metaList(), opts.operationTimeout(), 10, metaGroup);
}

public EventExecutor getExecutor(String name, int threadCount) {
public EventExecutor getExecutor() {
return tableGroup.next();
}

Expand All @@ -87,7 +80,9 @@ public ReplicaSession getReplicaSession(rpc_address address) {
if (ss != null) return ss;
ss =
new ReplicaSession(
address, replicaGroup, max(operationTimeout, Cluster.MIN_SOCK_CONNECT_TIMEOUT));
address,
replicaGroup,
max(operationTimeout, ClusterOptions.MIN_SOCK_CONNECT_TIMEOUT));
replicaSessions.put(address, ss);
return ss;
}
Expand Down Expand Up @@ -131,9 +126,8 @@ public String[] getMetaList() {
}

@Override
public TableHandler openTable(String name, KeyHasher h, int backupRequestDelayMs)
throws ReplicationException {
return new TableHandler(this, name, h, backupRequestDelayMs);
public TableHandler openTable(String name, TableOptions options) throws ReplicationException {
return new TableHandler(this, name, options);
}

@Override
Expand Down
Loading

0 comments on commit 6a76f58

Please sign in to comment.