-
Notifications
You must be signed in to change notification settings - Fork 34
/
distrib.go
90 lines (75 loc) · 2.02 KB
/
distrib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package fscache
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"io"
)
// Distributor provides a way to partition keys into Caches.
type Distributor interface {
// GetCache will always return the same Cache for the same key.
GetCache(key string) Cache
// Clean should wipe all the caches this Distributor manages
Clean() error
}
// stdDistribution distributes the keyspace evenly.
func stdDistribution(key string, n uint64) uint64 {
h := sha1.New()
_, _ = io.WriteString(h, key)
buf := bytes.NewBuffer(h.Sum(nil)[:8])
i, _ := binary.ReadUvarint(buf)
return i % n
}
// NewDistributor returns a Distributor which evenly distributes the keyspace
// into the passed caches.
func NewDistributor(caches ...Cache) Distributor {
if len(caches) == 0 {
return nil
}
return &distrib{
distribution: stdDistribution,
caches: caches,
size: uint64(len(caches)),
}
}
type distrib struct {
distribution func(key string, n uint64) uint64
caches []Cache
size uint64
}
func (d *distrib) GetCache(key string) Cache {
return d.caches[d.distribution(key, d.size)]
}
// Clean cleans all the caches this Distributor manages.
// It continues to clean even if one of the caches returns an error,
// but will return the first error encountered.
func (d *distrib) Clean() error {
var err1 error
for _, c := range d.caches {
if err2 := c.Clean(); err2 != nil && err1 == nil {
err1 = err2
}
}
return err1
}
// NewPartition returns a Cache which uses the Caches defined by the passed Distributor.
func NewPartition(d Distributor) Cache {
return &partition{
distributor: d,
}
}
type partition struct {
distributor Distributor
}
func (p *partition) Get(key string) (ReadAtCloser, io.WriteCloser, error) {
return p.distributor.GetCache(key).Get(key)
}
func (p *partition) Remove(key string) error {
return p.distributor.GetCache(key).Remove(key)
}
func (p *partition) Exists(key string) bool {
return p.distributor.GetCache(key).Exists(key)
}
func (p *partition) Clean() error {
return p.distributor.Clean()
}