diff --git a/cluster/loadbalance/consistent_hash.go b/cluster/loadbalance/consistent_hash.go new file mode 100644 index 0000000000..365e6a6624 --- /dev/null +++ b/cluster/loadbalance/consistent_hash.go @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "crypto/md5" + "encoding/json" + "fmt" + "hash/crc32" + "regexp" + "sort" + "strconv" + "strings" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +const ( + ConsistentHash = "consistenthash" + HashNodes = "hash.nodes" + HashArguments = "hash.arguments" +) + +var ( + selectors = make(map[string]*ConsistentHashSelector) + re = regexp.MustCompile(constant.COMMA_SPLIT_PATTERN) +) + +func init() { + extension.SetLoadbalance(ConsistentHash, NewConsistentHashLoadBalance) +} + +type ConsistentHashLoadBalance struct { +} + +func NewConsistentHashLoadBalance() cluster.LoadBalance { + return &ConsistentHashLoadBalance{} +} + +func (lb *ConsistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { + methodName := invocation.MethodName() + key := invokers[0].GetUrl().ServiceKey() + "." + methodName + + // hash the invokers + bs := make([]byte, 0) + for _, invoker := range invokers { + b, err := json.Marshal(invoker) + if err != nil { + return nil + } + bs = append(bs, b...) + } + hashCode := crc32.ChecksumIEEE(bs) + selector, ok := selectors[key] + if !ok || selector.hashCode != hashCode { + selectors[key] = newConsistentHashSelector(invokers, methodName, hashCode) + selector = selectors[key] + } + return selector.Select(invocation) +} + +type Uint32Slice []uint32 + +func (s Uint32Slice) Len() int { + return len(s) +} + +func (s Uint32Slice) Less(i, j int) bool { + return s[i] < s[j] +} + +func (s Uint32Slice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +type ConsistentHashSelector struct { + hashCode uint32 + replicaNum int + virtualInvokers map[uint32]protocol.Invoker + keys Uint32Slice + argumentIndex []int +} + +func newConsistentHashSelector(invokers []protocol.Invoker, methodName string, + hashCode uint32) *ConsistentHashSelector { + + selector := &ConsistentHashSelector{} + selector.virtualInvokers = make(map[uint32]protocol.Invoker) + selector.hashCode = hashCode + url := invokers[0].GetUrl() + selector.replicaNum = int(url.GetMethodParamInt(methodName, HashNodes, 160)) + indices := re.Split(url.GetMethodParam(methodName, HashArguments, "0"), -1) + for _, index := range indices { + i, err := strconv.Atoi(index) + if err != nil { + return nil + } + selector.argumentIndex = append(selector.argumentIndex, i) + } + for _, invoker := range invokers { + u := invoker.GetUrl() + address := u.Ip + ":" + u.Port + for i := 0; i < selector.replicaNum/4; i++ { + digest := md5.Sum([]byte(address + strconv.Itoa(i))) + for j := 0; j < 4; j++ { + key := selector.hash(digest, j) + selector.keys = append(selector.keys, key) + selector.virtualInvokers[key] = invoker + } + } + } + sort.Sort(selector.keys) + return selector +} + +func (c *ConsistentHashSelector) Select(invocation protocol.Invocation) protocol.Invoker { + key := c.toKey(invocation.Arguments()) + digest := md5.Sum([]byte(key)) + return c.selectForKey(c.hash(digest, 0)) +} + +func (c *ConsistentHashSelector) toKey(args []interface{}) string { + var sb strings.Builder + for i := range c.argumentIndex { + if i >= 0 && i < len(args) { + fmt.Fprint(&sb, args[i].(string)) + } + } + return sb.String() +} + +func (c *ConsistentHashSelector) selectForKey(hash uint32) protocol.Invoker { + idx := sort.Search(len(c.keys), func(i int) bool { + return c.keys[i] >= hash + }) + if idx == len(c.keys) { + idx = 0 + } + return c.virtualInvokers[c.keys[idx]] +} + +func (c *ConsistentHashSelector) hash(digest [16]byte, i int) uint32 { + return uint32((digest[3+i*4]&0xFF)<<24) | uint32((digest[2+i*4]&0xFF)<<16) | + uint32((digest[1+i*4]&0xFF)<<8) | uint32(digest[i*4]&0xFF)&0xFFFFFFF +} diff --git a/cluster/loadbalance/consistent_hash_test.go b/cluster/loadbalance/consistent_hash_test.go new file mode 100644 index 0000000000..174d5715dd --- /dev/null +++ b/cluster/loadbalance/consistent_hash_test.go @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/suite" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestConsistentHashSelectorSuite(t *testing.T) { + suite.Run(t, new(consistentHashSelectorSuite)) +} + +type consistentHashSelectorSuite struct { + suite.Suite + selector *ConsistentHashSelector +} + +func (s *consistentHashSelectorSuite) SetupTest() { + var invokers []protocol.Invoker + url, _ := common.NewURL(context.TODO(), + "dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + invokers = append(invokers, protocol.NewBaseInvoker(url)) + s.selector = newConsistentHashSelector(invokers, "echo", 999944) +} + +func (s *consistentHashSelectorSuite) TestToKey() { + result := s.selector.toKey([]interface{}{"username", "age"}) + s.Equal(result, "usernameage") +} + +func (s *consistentHashSelectorSuite) TestSelectForKey() { + url1, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080") + url2, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081") + s.selector.virtualInvokers = make(map[uint32]protocol.Invoker) + s.selector.virtualInvokers[99874] = protocol.NewBaseInvoker(url1) + s.selector.virtualInvokers[9999945] = protocol.NewBaseInvoker(url2) + s.selector.keys = []uint32{99874, 9999945} + result := s.selector.selectForKey(9999944) + s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?") +} + +func TestConsistentHashLoadBalanceSuite(t *testing.T) { + suite.Run(t, new(consistentHashLoadBalanceSuite)) +} + +type consistentHashLoadBalanceSuite struct { + suite.Suite + url1 common.URL + url2 common.URL + url3 common.URL + invokers []protocol.Invoker + invoker1 protocol.Invoker + invoker2 protocol.Invoker + invoker3 protocol.Invoker + lb cluster.LoadBalance +} + +func (s *consistentHashLoadBalanceSuite) SetupTest() { + var err error + s.url1, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + s.NoError(err) + s.url2, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + s.NoError(err) + s.url3, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + s.NoError(err) + + s.invoker1 = protocol.NewBaseInvoker(s.url1) + s.invoker2 = protocol.NewBaseInvoker(s.url2) + s.invoker3 = protocol.NewBaseInvoker(s.url3) + + s.invokers = append(s.invokers, s.invoker1, s.invoker2, s.invoker3) + s.lb = NewConsistentHashLoadBalance() +} + +func (s *consistentHashLoadBalanceSuite) TestSelect() { + args := []interface{}{"name", "password", "age"} + invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil)) + s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080") + + args = []interface{}{"ok", "abc"} + invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil)) + s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082") +} diff --git a/common/constant/default.go b/common/constant/default.go index cb6d68af05..f8c8ad4903 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -67,3 +67,7 @@ const ( APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators" PROVIDER_CATEGORY = "providers" ) + +const ( + COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*" +) diff --git a/go.mod b/go.mod index 4d1f8acbba..0476630f18 100644 --- a/go.mod +++ b/go.mod @@ -51,3 +51,5 @@ require ( google.golang.org/grpc v1.22.1 gopkg.in/yaml.v2 v2.2.2 ) + +go 1.13