Skip to content

Commit

Permalink
Add replication setting for NetworkTopologyStrategy of Cassandra
Browse files Browse the repository at this point in the history
Change-Id: Idc10d03f1b23daf9bf73bc2e392fde9971fc36d9
  • Loading branch information
zhoney committed Apr 11, 2019
1 parent 727a0e4 commit 5c5c139
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

package com.baidu.hugegraph.backend.store.cassandra;

import com.baidu.hugegraph.config.ConfigListOption;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;

import static com.baidu.hugegraph.config.OptionChecker.allowValues;
import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty;
import static com.baidu.hugegraph.config.OptionChecker.rangeInt;

import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;

public class CassandraOptions extends OptionHolder {

private CassandraOptions() {
Expand Down Expand Up @@ -91,17 +92,20 @@ public static synchronized CassandraOptions instance() {
public static final ConfigOption<String> CASSANDRA_STRATEGY =
new ConfigOption<>(
"cassandra.keyspace.strategy",
"The keyspace strategy.",
disallowEmpty(),
"The keyspace strategy, valid value is " +
"SimpleStrategy or NetworkTopologyStrategy.",
allowValues("SimpleStrategy", "NetworkTopologyStrategy"),
"SimpleStrategy"
);

public static final ConfigOption<Integer> CASSANDRA_REPLICATION =
new ConfigOption<>(
public static final ConfigListOption<String> CASSANDRA_REPLICATION =
new ConfigListOption<>(
"cassandra.keyspace.replication",
"The keyspace replication factor.",
rangeInt(1, 100),
3
"The keyspace replication factor of SimpleStrategy, " +
"like '3'. Or data center replicas of " +
"NetworkTopologyStrategy, like 'dc1:2,dc2:1'.",
disallowEmpty(),
"3"
);

public static final ConfigOption<String> CASSANDRA_COMPRESSION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.slf4j.Logger;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.Query;
Expand Down Expand Up @@ -373,20 +374,9 @@ protected Cluster cluster() {
}

protected void initKeyspace() {
// Replication strategy: SimpleStrategy or NetworkTopologyStrategy
String strategy = this.conf.get(CassandraOptions.CASSANDRA_STRATEGY);

// Replication factor
int factor = this.conf.get(CassandraOptions.CASSANDRA_REPLICATION);

Map<String, Object> replication = new HashMap<>();
replication.putIfAbsent("class", strategy);
replication.putIfAbsent("replication_factor", factor);

Statement stmt = SchemaBuilder.createKeyspace(this.keyspace)
.ifNotExists().with()
.replication(replication);

.replication(parseReplica(this.conf));
// Create keyspace with non-keyspace-session
LOG.debug("Create keyspace: {}", stmt);
Session session = this.cluster().connect();
Expand All @@ -399,6 +389,51 @@ protected void initKeyspace() {
}
}

private static Map<String, Object> parseReplica(HugeConfig conf) {
Map<String, Object> replication = new HashMap<>();
// Replication strategy: SimpleStrategy or NetworkTopologyStrategy
String strategy = conf.get(CassandraOptions.CASSANDRA_STRATEGY);
replication.put("class", strategy);

switch (strategy) {
case "SimpleStrategy":
List<String> replicas =
conf.get(CassandraOptions.CASSANDRA_REPLICATION);
E.checkArgument(replicas.size() == 1,
"Individual factor value should be provided " +
"with SimpleStrategy for Cassandra");
int factor = convertFactor(replicas.get(0));
replication.put("replication_factor", factor);
break;
case "NetworkTopologyStrategy":
// The replicas format is like 'dc1:2,dc2:1'
Map<String, String> replicaMap =
conf.getMap(CassandraOptions.CASSANDRA_REPLICATION);
for (Map.Entry<String, String> e : replicaMap.entrySet()) {
E.checkArgument(!e.getKey().isEmpty(),
"The data center can't be empty");
replication.put(e.getKey(), convertFactor(e.getValue()));
}
break;
default:
throw new AssertionError(String.format(
"Illegal Cassandra strategy '%s', valid strategy " +
"is 'SimpleStrategy' or 'NetworkTopologyStrategy'",
strategy));
}
return replication;
}

private static int convertFactor(String factor) {
try {
return Integer.valueOf(factor);
} catch (NumberFormatException e) {
throw new HugeException(
"Integer factor value must be provided with " +
"SimpleStrategy for Cassandra, but got '%s'", factor);
}
}

protected void clearKeyspace() {
// Drop keyspace with non-keyspace-session
Statement stmt = SchemaBuilder.dropKeyspace(this.keyspace).ifExists();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static void main(String[] args)

String input = args[0];
File output = new File(input + ".default");
System.out.println("Output config: " + input);
System.out.println("Input config: " + input);
System.out.println("Output config: " + output.getPath());

RegisterUtil.registerBackends();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public synchronized void open(HugeConfig config) {
this.sessions = this.open(config, this.tableNames());

// Open tables with optimized disk
List<String> disks = config.get(RocksDBOptions.DATA_DISKS);
Map<String, String> disks = config.getMap(RocksDBOptions.DATA_DISKS);
if (!disks.isEmpty()) {
this.parseTableDiskMapping(disks);
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
Expand Down Expand Up @@ -446,19 +446,19 @@ private void checkOpened() {
this.database, this.provider.type());
}

private void parseTableDiskMapping(List<String> disks) {
private void parseTableDiskMapping(Map<String, String> disks) {
this.tableDiskMapping.clear();
for (String disk : disks) {
for (Map.Entry<String, String> disk : disks.entrySet()) {
// The format of `disk` like: `graph/vertex: /path/to/disk1`
String[] pair = disk.split(":", 2);
E.checkState(pair.length == 2,
"Invalid disk format: '%s', expect `NAME:PATH`", disk);
String name = pair[0].trim();
String path = pair[1].trim();
pair = name.split("/", 2);
E.checkState(pair.length == 2,
"Invalid disk key format: '%s', expect `STORE/TABLE`",
name);
String name = disk.getKey();
String path = disk.getValue();
E.checkArgument(!name.isEmpty() && !path.isEmpty(),
"Invalid disk format: '%s', expect `NAME:PATH`",
disk);
String[] pair = name.split("/", 2);
E.checkArgument(pair.length == 2,
"Invalid disk key format: '%s', " +
"expect `STORE/TABLE`", name);
String store = pair[0].trim();
HugeType table = HugeType.valueOf(pair[1].trim().toUpperCase());
if (this.store.equals(store)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.baidu.hugegraph.testutil;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;

Expand All @@ -31,6 +33,7 @@
import com.baidu.hugegraph.testutil.FakeObjects.FakeEdge;
import com.baidu.hugegraph.testutil.FakeObjects.FakeVertex;
import com.baidu.hugegraph.util.DateUtil;
import com.baidu.hugegraph.util.E;

public class Utils {

Expand Down Expand Up @@ -80,4 +83,29 @@ public static boolean contains(List<Edge> edges, FakeEdge fakeEdge) {
public static Date date(String rawDate) {
return DateUtil.parse(rawDate);
}

public static <T> T invokeStatic(Class<?> clazz, String methodName,
Object... args) {
Class<?>[] classes = new Class<?>[args.length];
int i = 0;
for (Object arg : args) {
E.checkArgument(arg != null, "The argument can't be null");
classes[i++] = arg.getClass();
}
try {
Method method = clazz.getDeclaredMethod(methodName, classes);
method.setAccessible(true);
@SuppressWarnings("unchecked")
T result = (T) method.invoke(null, args);
return result;
} catch (NoSuchMethodException e) {
throw new RuntimeException(String.format(
"Can't find method '%s' of class '%s'",
methodName, clazz), e);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(String.format(
"Can't execute method '%s' of class '%s'",
methodName, clazz), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.baidu.hugegraph.unit.cache.RamCacheTest;
import com.baidu.hugegraph.unit.core.AnalyzerTest;
import com.baidu.hugegraph.unit.core.BackendMutationTest;
import com.baidu.hugegraph.unit.core.CassandraTest;
import com.baidu.hugegraph.unit.core.ConditionQueryFlattenTest;
import com.baidu.hugegraph.unit.core.EdgeIdTest;
import com.baidu.hugegraph.unit.core.JsonUtilTest;
Expand All @@ -40,6 +41,7 @@

VersionTest.class,
BackendMutationTest.class,
CassandraTest.class,
ConditionQueryFlattenTest.class,
EdgeIdTest.class,
AnalyzerTest.class,
Expand Down
Loading

0 comments on commit 5c5c139

Please sign in to comment.