diff --git a/kit/store/redis/redis.go b/kit/store/redis/redis.go index 53afd70..3fd5cd4 100644 --- a/kit/store/redis/redis.go +++ b/kit/store/redis/redis.go @@ -4,10 +4,10 @@ import ( "context" "fmt" - rdsV8 "github.com/go-redis/redis/v8" - "github.com/sado0823/go-kitx/errorx" "github.com/sado0823/go-kitx/kit/breaker" + + rdsV8 "github.com/go-redis/redis/v8" ) const ( diff --git a/pkg/bloom/README.md b/pkg/bloom/README.md new file mode 100644 index 0000000..5ac15ef --- /dev/null +++ b/pkg/bloom/README.md @@ -0,0 +1,142 @@ +# go-bloom +a go bloom filter , base on different implement like redis ... + + + +# 项目地址: + +https://github.com/sado0823/go-kitx + + + +# what? + +```js +上一篇在提到缓存击穿的时候, 有一种解决办法就是布隆过滤器 + + +布隆过滤器(英語:Bloom Filter)是1970年由布隆提出的。 它实际上是一个很长的二进制向量和一系列随机映射函数。 布隆过滤器可以用于检索一个元素是否在一个集合中。 它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难 + +``` + + + +# why? + +```js +布隆过滤器: 可以判断某元素在不在集合里面,因为存在一定的误判和删除复杂问题 +``` + +一般的使用场景是: + +* 防止缓存击穿(防止恶意攻击) +* 垃圾邮箱过滤 +* cache digests (缓存索引) +* 模型检测器 +* 判断是否存在某行数据,用以减少对磁盘访问,提高服务的访问性能 + + + +# how? + +## 基本思想 + +通过多个`hash`方法, 进行多次hash操作, 使其值位于`bit`不同位上, 检测该`bit`上的数据是否为`1`, 从而判断是否存在 + +![image-20210912175241849](./image-bloom.png) + + + +## 源码分析 + +`interface: bloom.go` + +```go +// 过滤器的核心实现, 通过interface的方式, 可以支持多种实现 +// 目前实现了基于redis bit数据类型的过滤器 +Provider interface { + Add(data []byte) error + Exists(data []byte) (bool, error) +} + +// Filter is a bloom filter +Filter struct { + + // todo counter + total int64 + hit int64 + miss int64 + + provider Provider +} +``` + + + +`redis实现: internal/redis/redis_bit.go` + +```js +// 实现Provider接口的两个方法 + +// Add implement Provider interface +func (r *Provider) Add(data []byte) error { + location := r.getBitLocation(data) + return r.set(location) +} + +// Exists implement Provider interface +func (r *Provider) Exists(data []byte) (bool, error) { + location := r.getBitLocation(data) + return r.check(location) +} + +// 核心方法 +// 通过14次hash, 每次hash都在数据最后追加一个byte(index), 最后进行取模, 分布在map里面的每个区间 +// 检查是否存在时, 对每个bit位进行判断, 如果有一个等于0, 则数据不存在 +// getBitLocation return data hash to bit location +func (r *Provider) getBitLocation(data []byte) []uint { + l := make([]uint, maps) + for i := 0; i < maps; i++ { + hashV := r.hash(append(data, byte(i))) + l[i] = uint(hashV % uint64(maps)) + } + return l +} +``` + + + +`todo` + +```js +1) 可以实现统计数据, 比如总量, 命中率, 丢失率等 + +2) 实现其它bloom过滤器provider(目前只有基于redis bit) +``` + + + +# example + +```go +func test() { + filter := NewRedis("127.0.0.1:6379", "test-bloom", 1024) + + _ = filter.Add([]byte("a")) + _ = filter.Add([]byte("b)) + + _, _ = filter.Exists([]byte("a)) + _, _ = filter.Exists([]byte("p)) +} + +``` + + + + + +# references + +1.https://github.com/tal-tech/go-zero + +2.http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html diff --git a/pkg/bloom/bloom.go b/pkg/bloom/bloom.go new file mode 100644 index 0000000..24ec225 --- /dev/null +++ b/pkg/bloom/bloom.go @@ -0,0 +1,27 @@ +package bloom + +import ( + "context" +) + +type ( + // Filter is a bloom filter + Filter struct { + + // todo counter + //total int64 + //hit int64 + //miss int64 + + Provider + } + + Provider interface { + Add(ctx context.Context, data []byte) error + Exists(ctx context.Context, data []byte) (bool, error) + } +) + +func NewWithProvider(provider Provider) *Filter { + return &Filter{Provider: provider} +} diff --git a/pkg/bloom/image-bloom.png b/pkg/bloom/image-bloom.png new file mode 100644 index 0000000..442cc10 Binary files /dev/null and b/pkg/bloom/image-bloom.png differ diff --git a/pkg/bloom/redis_provider.go b/pkg/bloom/redis_provider.go new file mode 100644 index 0000000..e5faace --- /dev/null +++ b/pkg/bloom/redis_provider.go @@ -0,0 +1,117 @@ +package bloom + +import ( + "context" + "errors" + "fmt" + "strconv" + + "github.com/sado0823/go-kitx/kit/store/redis" + "github.com/spaolacci/murmur3" +) + +const ( + // for detail, see http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html + maps = 14 + setScript = ` +for _, offset in ipairs(ARGV) do + redis.call("setbit", KEYS[1], offset, 1) +end +` + checkScript = ` +for _, offset in ipairs(ARGV) do + if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then + return false + end +end +return true +` +) + +var ErrTooLargeOffset = errors.New("too large offset") + +type rdsProvider struct { + store *redis.Redis + key string + bits uint +} + +func NewRedisProvider(addr string, key string, bits uint) Provider { + return &rdsProvider{store: redis.New(addr), key: key, bits: bits} +} + +// Add implement Provider interface +func (r *rdsProvider) Add(ctx context.Context, data []byte) error { + location := r.getBitLocation(data) + return r.set(ctx, location) +} + +// Exists implement Provider interface +func (r *rdsProvider) Exists(ctx context.Context, data []byte) (bool, error) { + location := r.getBitLocation(data) + return r.check(ctx, location) +} + +// getBitLocation return data hash to bit location +func (r *rdsProvider) getBitLocation(data []byte) []uint { + l := make([]uint, maps) + for i := 0; i < maps; i++ { + hashV := r.hash(append(data, byte(i))) + l[i] = uint(hashV % uint64(maps)) + } + return l +} + +// set those offsets into bloom filter +func (r *rdsProvider) set(ctx context.Context, offsets []uint) error { + args, err := r.buildOffsetArgs(offsets) + if err != nil { + return err + } + + _, err = r.store.Eval(ctx, setScript, []string{r.key}, args) + if errors.Is(err, redis.Nil) { + return nil + } + + return err +} + +// check if those offsets are in bloom filter +func (r *rdsProvider) check(ctx context.Context, offsets []uint) (bool, error) { + args, err := r.buildOffsetArgs(offsets) + if err != nil { + return false, err + } + + eval, err := r.store.Eval(ctx, checkScript, []string{r.key}, args) + if errors.Is(err, redis.Nil) { + return false, nil + } else if err != nil { + return false, err + } + + return fmt.Sprintf("%v", eval) == "1", nil +} + +// buildOffsetArgs set []uint offset to []string that can use in redis +// and check if offset is larger than r.bits +func (r *rdsProvider) buildOffsetArgs(offsets []uint) ([]string, error) { + var args []string + + for _, offset := range offsets { + if offset >= r.bits { + return nil, ErrTooLargeOffset + } + + args = append(args, strconv.FormatUint(uint64(offset), 10)) + + } + + return args, nil +} + +// hash returns the hash value of data. +func (r *rdsProvider) hash(data []byte) uint64 { + return murmur3.Sum64(data) +} diff --git a/pkg/bloom/redis_provider_test.go b/pkg/bloom/redis_provider_test.go new file mode 100644 index 0000000..a1ebbe1 --- /dev/null +++ b/pkg/bloom/redis_provider_test.go @@ -0,0 +1,75 @@ +package bloom + +import ( + "context" + "testing" + "time" + + "github.com/sado0823/go-kitx/kit/store/redis" + + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/assert" +) + +// createRedis returns an in process redis.Redis. +func createRedis() (addr string, clean func(), err error) { + mr, err := miniredis.Run() + if err != nil { + return "", nil, err + } + + return mr.Addr(), func() { + ch := make(chan struct{}) + go func() { + mr.Close() + close(ch) + }() + select { + case <-ch: + case <-time.After(time.Second): + } + }, nil +} + +func TestRedisBitSet_New_Set_Test(t *testing.T) { + addr, clean, err := createRedis() + assert.Nil(t, err) + defer clean() + ctx := context.Background() + + bitSet := &rdsProvider{store: redis.New(addr), key: "test_key", bits: 1024} + isSetBefore, err := bitSet.check(ctx, []uint{0}) + if err != nil { + t.Fatal(err) + } + if isSetBefore { + t.Fatal("Bit should not be set") + } + err = bitSet.set(ctx, []uint{512}) + if err != nil { + t.Fatal(err) + } + isSetAfter, err := bitSet.check(ctx, []uint{512}) + if err != nil { + t.Fatal(err) + } + if !isSetAfter { + t.Fatal("Bit should be set") + } + +} + +func TestRedisBitSet_Add(t *testing.T) { + addr, clean, err := createRedis() + assert.Nil(t, err) + defer clean() + + ctx := context.Background() + + filter := &rdsProvider{store: redis.New(addr), key: "test_key", bits: 1024} + assert.Nil(t, filter.Add(ctx, []byte("hello"))) + assert.Nil(t, filter.Add(ctx, []byte("world"))) + ok, err := filter.Exists(ctx, []byte("hello")) + assert.Nil(t, err) + assert.True(t, ok) +}