From 5c5c139a0e320ba6ca1dcad3fcae6e6632173e78 Mon Sep 17 00:00:00 2001 From: zhangyi51 Date: Thu, 14 Mar 2019 21:31:29 +0800 Subject: [PATCH] Add replication setting for NetworkTopologyStrategy of Cassandra Change-Id: Idc10d03f1b23daf9bf73bc2e392fde9971fc36d9 --- .../store/cassandra/CassandraOptions.java | 24 +- .../store/cassandra/CassandraStore.java | 59 ++++- .../com/baidu/hugegraph/cmd/ConfDumper.java | 2 +- .../backend/store/rocksdb/RocksDBStore.java | 24 +- .../com/baidu/hugegraph/testutil/Utils.java | 28 +++ .../baidu/hugegraph/unit/UnitTestSuite.java | 2 + .../hugegraph/unit/core/CassandraTest.java | 232 ++++++++++++++++++ 7 files changed, 336 insertions(+), 35 deletions(-) create mode 100644 hugegraph-test/src/main/java/com/baidu/hugegraph/unit/core/CassandraTest.java diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraOptions.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraOptions.java index ddc63e7146..56e45ed101 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraOptions.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraOptions.java @@ -19,13 +19,14 @@ package com.baidu.hugegraph.backend.store.cassandra; +import com.baidu.hugegraph.config.ConfigListOption; +import com.baidu.hugegraph.config.ConfigOption; +import com.baidu.hugegraph.config.OptionHolder; + import static com.baidu.hugegraph.config.OptionChecker.allowValues; import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty; import static com.baidu.hugegraph.config.OptionChecker.rangeInt; -import com.baidu.hugegraph.config.ConfigOption; -import com.baidu.hugegraph.config.OptionHolder; - public class CassandraOptions extends OptionHolder { private CassandraOptions() { @@ -91,17 +92,20 @@ public static synchronized CassandraOptions instance() { public static final ConfigOption CASSANDRA_STRATEGY = new ConfigOption<>( "cassandra.keyspace.strategy", - "The keyspace strategy.", - disallowEmpty(), + "The keyspace strategy, valid value is " + + "SimpleStrategy or NetworkTopologyStrategy.", + allowValues("SimpleStrategy", "NetworkTopologyStrategy"), "SimpleStrategy" ); - public static final ConfigOption CASSANDRA_REPLICATION = - new ConfigOption<>( + public static final ConfigListOption CASSANDRA_REPLICATION = + new ConfigListOption<>( "cassandra.keyspace.replication", - "The keyspace replication factor.", - rangeInt(1, 100), - 3 + "The keyspace replication factor of SimpleStrategy, " + + "like '3'. Or data center replicas of " + + "NetworkTopologyStrategy, like 'dc1:2,dc2:1'.", + disallowEmpty(), + "3" ); public static final ConfigOption CASSANDRA_COMPRESSION = diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java index e7af77dfe8..9c49b33c5f 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.query.Query; @@ -373,20 +374,9 @@ protected Cluster cluster() { } protected void initKeyspace() { - // Replication strategy: SimpleStrategy or NetworkTopologyStrategy - String strategy = this.conf.get(CassandraOptions.CASSANDRA_STRATEGY); - - // Replication factor - int factor = this.conf.get(CassandraOptions.CASSANDRA_REPLICATION); - - Map replication = new HashMap<>(); - replication.putIfAbsent("class", strategy); - replication.putIfAbsent("replication_factor", factor); - Statement stmt = SchemaBuilder.createKeyspace(this.keyspace) .ifNotExists().with() - .replication(replication); - + .replication(parseReplica(this.conf)); // Create keyspace with non-keyspace-session LOG.debug("Create keyspace: {}", stmt); Session session = this.cluster().connect(); @@ -399,6 +389,51 @@ protected void initKeyspace() { } } + private static Map parseReplica(HugeConfig conf) { + Map replication = new HashMap<>(); + // Replication strategy: SimpleStrategy or NetworkTopologyStrategy + String strategy = conf.get(CassandraOptions.CASSANDRA_STRATEGY); + replication.put("class", strategy); + + switch (strategy) { + case "SimpleStrategy": + List replicas = + conf.get(CassandraOptions.CASSANDRA_REPLICATION); + E.checkArgument(replicas.size() == 1, + "Individual factor value should be provided " + + "with SimpleStrategy for Cassandra"); + int factor = convertFactor(replicas.get(0)); + replication.put("replication_factor", factor); + break; + case "NetworkTopologyStrategy": + // The replicas format is like 'dc1:2,dc2:1' + Map replicaMap = + conf.getMap(CassandraOptions.CASSANDRA_REPLICATION); + for (Map.Entry e : replicaMap.entrySet()) { + E.checkArgument(!e.getKey().isEmpty(), + "The data center can't be empty"); + replication.put(e.getKey(), convertFactor(e.getValue())); + } + break; + default: + throw new AssertionError(String.format( + "Illegal Cassandra strategy '%s', valid strategy " + + "is 'SimpleStrategy' or 'NetworkTopologyStrategy'", + strategy)); + } + return replication; + } + + private static int convertFactor(String factor) { + try { + return Integer.valueOf(factor); + } catch (NumberFormatException e) { + throw new HugeException( + "Integer factor value must be provided with " + + "SimpleStrategy for Cassandra, but got '%s'", factor); + } + } + protected void clearKeyspace() { // Drop keyspace with non-keyspace-session Statement stmt = SchemaBuilder.dropKeyspace(this.keyspace).ifExists(); diff --git a/hugegraph-dist/src/main/java/com/baidu/hugegraph/cmd/ConfDumper.java b/hugegraph-dist/src/main/java/com/baidu/hugegraph/cmd/ConfDumper.java index 22352b0397..fd98075333 100644 --- a/hugegraph-dist/src/main/java/com/baidu/hugegraph/cmd/ConfDumper.java +++ b/hugegraph-dist/src/main/java/com/baidu/hugegraph/cmd/ConfDumper.java @@ -43,7 +43,7 @@ public static void main(String[] args) String input = args[0]; File output = new File(input + ".default"); - System.out.println("Output config: " + input); + System.out.println("Input config: " + input); System.out.println("Output config: " + output.getPath()); RegisterUtil.registerBackends(); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index 5fc8620adf..c43e824a11 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -155,7 +155,7 @@ public synchronized void open(HugeConfig config) { this.sessions = this.open(config, this.tableNames()); // Open tables with optimized disk - List disks = config.get(RocksDBOptions.DATA_DISKS); + Map disks = config.getMap(RocksDBOptions.DATA_DISKS); if (!disks.isEmpty()) { this.parseTableDiskMapping(disks); for (Entry e : this.tableDiskMapping.entrySet()) { @@ -446,19 +446,19 @@ private void checkOpened() { this.database, this.provider.type()); } - private void parseTableDiskMapping(List disks) { + private void parseTableDiskMapping(Map disks) { this.tableDiskMapping.clear(); - for (String disk : disks) { + for (Map.Entry disk : disks.entrySet()) { // The format of `disk` like: `graph/vertex: /path/to/disk1` - String[] pair = disk.split(":", 2); - E.checkState(pair.length == 2, - "Invalid disk format: '%s', expect `NAME:PATH`", disk); - String name = pair[0].trim(); - String path = pair[1].trim(); - pair = name.split("/", 2); - E.checkState(pair.length == 2, - "Invalid disk key format: '%s', expect `STORE/TABLE`", - name); + String name = disk.getKey(); + String path = disk.getValue(); + E.checkArgument(!name.isEmpty() && !path.isEmpty(), + "Invalid disk format: '%s', expect `NAME:PATH`", + disk); + String[] pair = name.split("/", 2); + E.checkArgument(pair.length == 2, + "Invalid disk key format: '%s', " + + "expect `STORE/TABLE`", name); String store = pair[0].trim(); HugeType table = HugeType.valueOf(pair[1].trim().toUpperCase()); if (this.store.equals(store)) { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/testutil/Utils.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/testutil/Utils.java index a3342dbde2..bdfe9c0604 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/testutil/Utils.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/testutil/Utils.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.testutil; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Date; import java.util.List; @@ -31,6 +33,7 @@ import com.baidu.hugegraph.testutil.FakeObjects.FakeEdge; import com.baidu.hugegraph.testutil.FakeObjects.FakeVertex; import com.baidu.hugegraph.util.DateUtil; +import com.baidu.hugegraph.util.E; public class Utils { @@ -80,4 +83,29 @@ public static boolean contains(List edges, FakeEdge fakeEdge) { public static Date date(String rawDate) { return DateUtil.parse(rawDate); } + + public static T invokeStatic(Class clazz, String methodName, + Object... args) { + Class[] classes = new Class[args.length]; + int i = 0; + for (Object arg : args) { + E.checkArgument(arg != null, "The argument can't be null"); + classes[i++] = arg.getClass(); + } + try { + Method method = clazz.getDeclaredMethod(methodName, classes); + method.setAccessible(true); + @SuppressWarnings("unchecked") + T result = (T) method.invoke(null, args); + return result; + } catch (NoSuchMethodException e) { + throw new RuntimeException(String.format( + "Can't find method '%s' of class '%s'", + methodName, clazz), e); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(String.format( + "Can't execute method '%s' of class '%s'", + methodName, clazz), e); + } + } } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/UnitTestSuite.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/UnitTestSuite.java index bb9a6b949e..98d0297fbe 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/UnitTestSuite.java @@ -26,6 +26,7 @@ import com.baidu.hugegraph.unit.cache.RamCacheTest; import com.baidu.hugegraph.unit.core.AnalyzerTest; import com.baidu.hugegraph.unit.core.BackendMutationTest; +import com.baidu.hugegraph.unit.core.CassandraTest; import com.baidu.hugegraph.unit.core.ConditionQueryFlattenTest; import com.baidu.hugegraph.unit.core.EdgeIdTest; import com.baidu.hugegraph.unit.core.JsonUtilTest; @@ -40,6 +41,7 @@ VersionTest.class, BackendMutationTest.class, + CassandraTest.class, ConditionQueryFlattenTest.class, EdgeIdTest.class, AnalyzerTest.class, diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/core/CassandraTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/core/CassandraTest.java new file mode 100644 index 0000000000..7efa00f8bc --- /dev/null +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/core/CassandraTest.java @@ -0,0 +1,232 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.unit.core; + +import java.util.Map; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.junit.After; +import com.baidu.hugegraph.testutil.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.baidu.hugegraph.backend.store.cassandra.CassandraOptions; +import com.baidu.hugegraph.backend.store.cassandra.CassandraStore; +import com.baidu.hugegraph.config.HugeConfig; +import com.baidu.hugegraph.testutil.Utils; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class CassandraTest { + + @Before + public void setup() { + // pass + } + + @After + public void teardown() { + // pass + } + + @Test + public void testParseRepilcaWithSimpleStrategy() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("SimpleStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("5")); + HugeConfig config = new HugeConfig(conf); + + Map result = Utils.invokeStatic(CassandraStore.class, + "parseReplica", + config); + + Map expected = ImmutableMap.of( + "class", "SimpleStrategy", + "replication_factor", 5); + Assert.assertEquals(expected, result); + } + + @Test + public void testParseRepilcaWithNetworkTopologyStrategy() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("NetworkTopologyStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("dc1:2", "dc2:1")); + HugeConfig config = new HugeConfig(conf); + + Map result = Utils.invokeStatic(CassandraStore.class, + "parseReplica", + config); + + Map expected = ImmutableMap.of( + "class", "NetworkTopologyStrategy", + "dc1", 2, + "dc2", 1); + Assert.assertEquals(expected, result); + } + + @Test + public void testParseRepilcaWithSimpleStrategyEmptyReplica() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("SimpleStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } + + @Test + public void testParseRepilcaWithSimpleStrategyDoubleReplica() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("SimpleStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("1.5")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } + + @Test + public void testParseRepilcaWithSimpleStrategyStringReplica() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("SimpleStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("xxx")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } + + @Test + public void testParseRepilcaWithNetworkTopologyStrategyStringReplica() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("NetworkTopologyStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("dc1:2", "dc2:x")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } + + @Test + public void testParseRepilcaWithNetworkTopologyStrategyEmptyDatacenter() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("NetworkTopologyStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of(":2", "dc2:1")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } + + @Test + public void testParseRepilcaWithNetworkTopologyStrategyEmptyReplica() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("NetworkTopologyStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("dc1:", "dc2:1")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } + + @Test + public void testParseRepilcaWithNetworkTopologyStrategyDoubleReplica() { + String strategy = CassandraOptions.CASSANDRA_STRATEGY.name(); + String replica = CassandraOptions.CASSANDRA_REPLICATION.name(); + + Configuration conf = Mockito.mock(PropertiesConfiguration.class); + Mockito.when(conf.getKeys()) + .thenReturn(ImmutableList.of(strategy, replica).iterator()); + Mockito.when(conf.getProperty(strategy)) + .thenReturn("NetworkTopologyStrategy"); + Mockito.when(conf.getProperty(replica)) + .thenReturn(ImmutableList.of("dc1:3.5", "dc2:1")); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(RuntimeException.class, () -> { + Utils.invokeStatic(CassandraStore.class, "parseReplica", config); + }); + } +}