Skip to content

Commit

Permalink
bloom filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sado0823 committed Oct 13, 2023
1 parent 4f0b100 commit 498f5a2
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 2 deletions.
4 changes: 2 additions & 2 deletions kit/store/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"fmt"

rdsV8 "github.com/go-redis/redis/v8"

"github.com/sado0823/go-kitx/errorx"
"github.com/sado0823/go-kitx/kit/breaker"

rdsV8 "github.com/go-redis/redis/v8"
)

const (
Expand Down
142 changes: 142 additions & 0 deletions pkg/bloom/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# go-bloom
a go bloom filter , base on different implement like redis ...



# 项目地址:

https://github.com/sado0823/go-kitx



# what?

```js
上一篇在提到缓存击穿的时候, 有一种解决办法就是布隆过滤器


布隆过滤器(英語:Bloom Filter)是1970年由布隆提出的。 它实际上是一个很长的二进制向量和一系列随机映射函数。 布隆过滤器可以用于检索一个元素是否在一个集合中。 它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难

```



# why?

```js
布隆过滤器: 可以判断某元素在不在集合里面,因为存在一定的误判和删除复杂问题
```

一般的使用场景是:

* 防止缓存击穿(防止恶意攻击)
* 垃圾邮箱过滤
* cache digests (缓存索引)
* 模型检测器
* 判断是否存在某行数据,用以减少对磁盘访问,提高服务的访问性能



# how?

## 基本思想

通过多个`hash`方法, 进行多次hash操作, 使其值位于`bit`不同位上, 检测该`bit`上的数据是否为`1`, 从而判断是否存在

![image-20210912175241849](./image-bloom.png)



## 源码分析

`interface: bloom.go`

```go
// 过滤器的核心实现, 通过interface的方式, 可以支持多种实现
// 目前实现了基于redis bit数据类型的过滤器
Provider interface {
Add(data []byte) error
Exists(data []byte) (bool, error)
}

// Filter is a bloom filter
Filter struct {

// todo counter
total int64
hit int64
miss int64

provider Provider
}
```



`redis实现: internal/redis/redis_bit.go`

```js
// 实现Provider接口的两个方法

// Add implement Provider interface
func (r *Provider) Add(data []byte) error {
location := r.getBitLocation(data)
return r.set(location)
}

// Exists implement Provider interface
func (r *Provider) Exists(data []byte) (bool, error) {
location := r.getBitLocation(data)
return r.check(location)
}

// 核心方法
// 通过14次hash, 每次hash都在数据最后追加一个byte(index), 最后进行取模, 分布在map里面的每个区间
// 检查是否存在时, 对每个bit位进行判断, 如果有一个等于0, 则数据不存在
// getBitLocation return data hash to bit location
func (r *Provider) getBitLocation(data []byte) []uint {
l := make([]uint, maps)
for i := 0; i < maps; i++ {
hashV := r.hash(append(data, byte(i)))
l[i] = uint(hashV % uint64(maps))
}
return l
}
```



`todo`

```js
1) 可以实现统计数据, 比如总量, 命中率, 丢失率等

2) 实现其它bloom过滤器provider(目前只有基于redis bit)
```



# example

```go
func test() {
filter := NewRedis("127.0.0.1:6379", "test-bloom", 1024)

_ = filter.Add([]byte("a"))
_ = filter.Add([]byte("b))
_, _ = filter.Exists([]byte("a))
_, _ = filter.Exists([]byte("p))
}
```
# references
1.https://github.com/tal-tech/go-zero
2.http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html
27 changes: 27 additions & 0 deletions pkg/bloom/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package bloom

import (
"context"
)

type (
// Filter is a bloom filter
Filter struct {

// todo counter
//total int64
//hit int64
//miss int64

Provider
}

Provider interface {
Add(ctx context.Context, data []byte) error
Exists(ctx context.Context, data []byte) (bool, error)
}
)

func NewWithProvider(provider Provider) *Filter {
return &Filter{Provider: provider}
}
Binary file added pkg/bloom/image-bloom.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
117 changes: 117 additions & 0 deletions pkg/bloom/redis_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package bloom

import (
"context"
"errors"
"fmt"
"strconv"

"github.com/sado0823/go-kitx/kit/store/redis"
"github.com/spaolacci/murmur3"
)

const (
// for detail, see http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html
maps = 14
setScript = `
for _, offset in ipairs(ARGV) do
redis.call("setbit", KEYS[1], offset, 1)
end
`
checkScript = `
for _, offset in ipairs(ARGV) do
if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
return false
end
end
return true
`
)

var ErrTooLargeOffset = errors.New("too large offset")

type rdsProvider struct {
store *redis.Redis
key string
bits uint
}

func NewRedisProvider(addr string, key string, bits uint) Provider {
return &rdsProvider{store: redis.New(addr), key: key, bits: bits}
}

// Add implement Provider interface
func (r *rdsProvider) Add(ctx context.Context, data []byte) error {
location := r.getBitLocation(data)
return r.set(ctx, location)
}

// Exists implement Provider interface
func (r *rdsProvider) Exists(ctx context.Context, data []byte) (bool, error) {
location := r.getBitLocation(data)
return r.check(ctx, location)
}

// getBitLocation return data hash to bit location
func (r *rdsProvider) getBitLocation(data []byte) []uint {
l := make([]uint, maps)
for i := 0; i < maps; i++ {
hashV := r.hash(append(data, byte(i)))
l[i] = uint(hashV % uint64(maps))
}
return l
}

// set those offsets into bloom filter
func (r *rdsProvider) set(ctx context.Context, offsets []uint) error {
args, err := r.buildOffsetArgs(offsets)
if err != nil {
return err
}

_, err = r.store.Eval(ctx, setScript, []string{r.key}, args)
if errors.Is(err, redis.Nil) {
return nil
}

return err
}

// check if those offsets are in bloom filter
func (r *rdsProvider) check(ctx context.Context, offsets []uint) (bool, error) {
args, err := r.buildOffsetArgs(offsets)
if err != nil {
return false, err
}

eval, err := r.store.Eval(ctx, checkScript, []string{r.key}, args)
if errors.Is(err, redis.Nil) {
return false, nil
} else if err != nil {
return false, err
}

return fmt.Sprintf("%v", eval) == "1", nil
}

// buildOffsetArgs set []uint offset to []string that can use in redis
// and check if offset is larger than r.bits
func (r *rdsProvider) buildOffsetArgs(offsets []uint) ([]string, error) {
var args []string

for _, offset := range offsets {
if offset >= r.bits {
return nil, ErrTooLargeOffset
}

args = append(args, strconv.FormatUint(uint64(offset), 10))

}

return args, nil
}

// hash returns the hash value of data.
func (r *rdsProvider) hash(data []byte) uint64 {
return murmur3.Sum64(data)
}
75 changes: 75 additions & 0 deletions pkg/bloom/redis_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package bloom

import (
"context"
"testing"
"time"

"github.com/sado0823/go-kitx/kit/store/redis"

"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
)

// createRedis returns an in process redis.Redis.
func createRedis() (addr string, clean func(), err error) {
mr, err := miniredis.Run()
if err != nil {
return "", nil, err
}

return mr.Addr(), func() {
ch := make(chan struct{})
go func() {
mr.Close()
close(ch)
}()
select {
case <-ch:
case <-time.After(time.Second):
}
}, nil
}

func TestRedisBitSet_New_Set_Test(t *testing.T) {
addr, clean, err := createRedis()
assert.Nil(t, err)
defer clean()
ctx := context.Background()

bitSet := &rdsProvider{store: redis.New(addr), key: "test_key", bits: 1024}
isSetBefore, err := bitSet.check(ctx, []uint{0})
if err != nil {
t.Fatal(err)
}
if isSetBefore {
t.Fatal("Bit should not be set")
}
err = bitSet.set(ctx, []uint{512})
if err != nil {
t.Fatal(err)
}
isSetAfter, err := bitSet.check(ctx, []uint{512})
if err != nil {
t.Fatal(err)
}
if !isSetAfter {
t.Fatal("Bit should be set")
}

}

func TestRedisBitSet_Add(t *testing.T) {
addr, clean, err := createRedis()
assert.Nil(t, err)
defer clean()

ctx := context.Background()

filter := &rdsProvider{store: redis.New(addr), key: "test_key", bits: 1024}
assert.Nil(t, filter.Add(ctx, []byte("hello")))
assert.Nil(t, filter.Add(ctx, []byte("world")))
ok, err := filter.Exists(ctx, []byte("hello"))
assert.Nil(t, err)
assert.True(t, ok)
}

0 comments on commit 498f5a2

Please sign in to comment.