Skip to content

Commit

Permalink
Merge branch 'master' of github.com:sado0823/go-kitx
Browse files Browse the repository at this point in the history
  • Loading branch information
sado0823 committed Oct 11, 2023
2 parents 5d29dac + ff1ba6c commit 4f0b100
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/pelletier/go-toml/v2 v2.0.5
github.com/pkg/errors v0.9.1
github.com/spaolacci/murmur3 v1.1.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/jaeger v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
242 changes: 242 additions & 0 deletions kit/hash/consistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package hash

import (
"fmt"
"sort"
"strconv"
"sync"

"github.com/spaolacci/murmur3"
)

const (
maxWeight = 100 // 最大权重参数
minVirtual // 最小虚拟节点数量
prime = 16777619 // 质数, 减少hash冲突
)

type (
Consistent interface {
Add(node interface{}, opts ...ConsistentAddWith)
Get(node interface{}) (value interface{}, has bool)
Remove(node interface{})
// Cascade 多次hash排序后的hash数组
Cascade(node interface{}, opts ...ConsistentAddWith) []uint64
}

consistent struct {
opt *option
vtrNum int64 // 虚拟节点数量
vtrKeys []uint64 // 虚拟节点值
vtrRing map[uint64][]interface{} // 虚拟节点环, 每一个值内存储对应真实节点数据
nodes map[string]struct{} // 真实节点
lock sync.RWMutex
}
)

type (
option struct {
vtr int64
hash func([]byte) uint64
}

ConsistentWith func(opt *option)
)

func ConsistentWithVtr(num int64) ConsistentWith {
return func(opt *option) {
opt.vtr = num
}
}

func ConsistentWithHash(hash func([]byte) uint64) ConsistentWith {
return func(opt *option) {
opt.hash = hash
}
}

type (
optionAdd struct {
vtr int64
weight int64
}
ConsistentAddWith func(opt *optionAdd)
)

func ConsistentAddWithVtr(num int64) ConsistentAddWith {
return func(opt *optionAdd) {
opt.vtr = num
}
}

func ConsistentAddWithWeight(weight int64) ConsistentAddWith {
return func(opt *optionAdd) {
opt.weight = weight
}
}

func NewConsistent(withs ...ConsistentWith) Consistent {
dft := &option{
vtr: minVirtual,
hash: murmur3.Sum64,
}

for i := range withs {
withs[i](dft)
}

if dft.vtr < minVirtual {
dft.vtr = minVirtual
}

if dft.hash == nil {
dft.hash = murmur3.Sum64
}

return &consistent{
opt: dft,
vtrNum: dft.vtr,
vtrKeys: make([]uint64, 0),
vtrRing: make(map[uint64][]interface{}),
nodes: make(map[string]struct{}),
}
}

func (c *consistent) Cascade(node interface{}, opts ...ConsistentAddWith) []uint64 {
dft := &optionAdd{vtr: c.vtrNum, weight: 0}
for i := range opts {
opts[i](dft)
}

vtr := c.vtrNum
if dft.weight > 0 {
vtr = c.vtrNum * dft.weight / maxWeight
}

if vtr > c.vtrNum {
vtr = c.vtrNum
}

nodeExpr := c.marshal(node)
c.lock.Lock()
defer c.lock.Unlock()

keys := make([]uint64, 0)
for i := int64(0); i < vtr; i++ {
hashV := c.opt.hash([]byte(nodeExpr + strconv.Itoa(int(i))))
keys = append(keys, hashV)
}

sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})

return keys
}

func (c *consistent) Add(node interface{}, opts ...ConsistentAddWith) {
dft := &optionAdd{vtr: c.vtrNum, weight: 0}
for i := range opts {
opts[i](dft)
}

vtr := c.vtrNum
if dft.weight > 0 {
vtr = c.vtrNum * dft.weight / maxWeight
}

if vtr > c.vtrNum {
vtr = c.vtrNum
}

nodeExpr := c.marshal(node)
c.lock.Lock()
defer c.lock.Unlock()
c.addNode(nodeExpr)

for i := int64(0); i < vtr; i++ {
hashV := c.opt.hash([]byte(nodeExpr + strconv.Itoa(int(i))))
c.vtrKeys = append(c.vtrKeys, hashV)
c.vtrRing[hashV] = append(c.vtrRing[hashV], node)
}

sort.Slice(c.vtrKeys, func(i, j int) bool {
return c.vtrKeys[i] < c.vtrKeys[j]
})
}

func (c *consistent) Get(node interface{}) (value interface{}, has bool) {
c.lock.Lock()
defer c.lock.Unlock()

if len(c.vtrRing) == 0 {
return nil, false
}

nodeExpr := c.marshal(node)
hashV := c.opt.hash([]byte(nodeExpr))
index := sort.Search(len(c.vtrKeys), func(i int) bool {
return c.vtrKeys[i] >= hashV
}) % len(c.vtrKeys)

nodes := c.vtrRing[c.vtrKeys[index]]
switch len(nodes) {
case 0:
return nil, false
case 1:
return nodes[0], true
default:
index := c.opt.hash([]byte(c.innerMarshal(node)))
pos := int(index % uint64(len(nodes)))
return nodes[pos], true
}
}

func (c *consistent) Remove(node interface{}) {
nodeExpr := c.marshal(node)

c.lock.Lock()
defer c.lock.Unlock()

if _, ok := c.nodes[nodeExpr]; !ok {
return
}

for i := int64(0); i < c.vtrNum; i++ {
hashV := c.opt.hash([]byte(nodeExpr + strconv.Itoa(int(i))))
index := sort.Search(len(c.vtrKeys), func(i int) bool {
return c.vtrKeys[i] >= hashV
})
if index < len(c.vtrKeys) && c.vtrKeys[index] == hashV {
c.vtrKeys = append(c.vtrKeys[:index], c.vtrKeys[index+1:]...)
}

if _, ok := c.vtrRing[hashV]; ok {
newNodes := c.vtrRing[hashV][:0]
for _, node := range c.vtrRing[hashV] {
if c.marshal(node) != nodeExpr {
newNodes = append(newNodes, node)
}
}
if len(newNodes) > 0 {
c.vtrRing[hashV] = newNodes
} else {
delete(c.vtrRing, hashV)
}
}
}

delete(c.nodes, nodeExpr)
}

func (c *consistent) addNode(key string) {
c.nodes[key] = struct{}{}
}

func (c *consistent) marshal(v interface{}) string {
return fmt.Sprintf("%v", v)
}

func (c *consistent) innerMarshal(node interface{}) string {
return fmt.Sprintf("%d:%v", prime, node)
}
96 changes: 96 additions & 0 deletions kit/hash/consistent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package hash

import (
"math"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_NewConsistent(t *testing.T) {
assert.NotPanics(t, func() {
NewConsistent()
})

assert.NotPanics(t, func() {
NewConsistent(ConsistentWithHash(nil))
})

assert.NotPanics(t, func() {
NewConsistent(ConsistentWithVtr(-1))
})
}

func Test_Consistent_Get(t *testing.T) {
ch := NewConsistent()
for i := 0; i < 20; i++ {
ch.Add("prefix" + strconv.Itoa(i))
}

keys := make(map[int]string, 1000)
for i := 0; i < 1000; i++ {
key, ok := ch.Get(1000 + i)
assert.True(t, ok)
assert.NotNil(t, key)
keys[i] = key.(string)
}

}

func Test_Consistent_Cascade(t *testing.T) {
hash := NewConsistent(ConsistentWithVtr(10))

cascade := hash.Cascade("foo")
t.Logf("cascade: %v", cascade)
assert.True(t, len(cascade) == 100)
}

func TestConsistentHash(t *testing.T) {
ch := NewConsistent()
val, ok := ch.Get("any")
assert.False(t, ok)
assert.Nil(t, val)

for i := 0; i < 20; i++ {
ch.Add("localhost:"+strconv.Itoa(i), ConsistentAddWithVtr(minVirtual<<1))
}

keys := make(map[string]int)
for i := 0; i < 1000; i++ {
key, ok := ch.Get(1000 + i)
assert.True(t, ok)
keys[key.(string)]++
}

mi := make(map[interface{}]int, len(keys))
for k, v := range keys {
mi[k] = v
}
entropy := calcEntropy(mi)
assert.True(t, entropy > .95)
}

const epsilon = 1e-6

func calcEntropy(m map[interface{}]int) float64 {
if len(m) == 0 || len(m) == 1 {
return 1
}

var entropy float64
var total int
for _, v := range m {
total += v
}

for _, v := range m {
proba := float64(v) / float64(total)
if proba < epsilon {
proba = epsilon
}
entropy -= proba * math.Log2(proba)
}

return entropy / math.Log2(float64(len(m)))
}

0 comments on commit 4f0b100

Please sign in to comment.