forked from grandecola/bigqueue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.go
131 lines (113 loc) · 3.79 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package bigqueue
import (
"errors"
"os"
"time"
"github.com/jonboulle/clockwork"
)
const (
cDefaultArenaSize = 128 * 1024 * 1024
// tail, head and a buffer arena, hence 3
cMinMaxInMemArenas = 3
// values chosen arbitrarily
cDefaultMutOps = 1000
cDefaultflushPeriod = time.Minute
)
var (
// ErrTooSmallArenaSize is returned when arena size is smaller than OS page size
ErrTooSmallArenaSize = errors.New("too small arena size")
// ErrTooFewInMemArenas is returned when number of arenas allowed in memory < 3
ErrTooFewInMemArenas = errors.New("too few in memory arenas")
// ErrMustBeGreaterThanZero is returned when a config value has non-positive value
ErrMustBeGreaterThanZero = errors.New("must be greater than zero")
// singleton instance of real clock.
// TODO: not needed once https://github.com/jonboulle/clockwork/pull/14 is merged
realClock = clockwork.NewRealClock()
)
// bqConfig stores all the configuration related to bigqueue
type bqConfig struct {
arenaSize int
maxInMemArenas int
flushMutOps int64
flushPeriod time.Duration
clock clockwork.Clock
}
// Option is function type that takes a bqConfig object
// and sets various config parameters in the object
type Option func(*bqConfig) error
// newConfig creates an object of bqConfig with default parameter values
func newConfig() *bqConfig {
return &bqConfig{
arenaSize: cDefaultArenaSize,
maxInMemArenas: cMinMaxInMemArenas,
flushMutOps: cDefaultMutOps,
flushPeriod: cDefaultflushPeriod,
clock: realClock,
}
}
// SetArenaSize returns an Option closure that sets the arena size
func SetArenaSize(arenaSize int) Option {
return func(c *bqConfig) error {
if arenaSize < os.Getpagesize() {
return ErrTooSmallArenaSize
}
c.arenaSize = arenaSize
return nil
}
}
// SetMaxInMemArenas returns an Option closure that sets maximum number of
// Arenas that could reside in memory (RAM) at any time. By default, all the
// arenas reside in memory and Operating System takes care of memory by paging
// in and out the pages from disk.
// A recommended value of maximum arenas that should be in memory
// is chosen such that -
// maxInMemArenas > 2 + (maximum message size / arena size)
// maxInMemArenas < (total available system memory - 1GB) / arena size
func SetMaxInMemArenas(maxInMemArenas int) Option {
return func(c *bqConfig) error {
if maxInMemArenas != 0 && maxInMemArenas < cMinMaxInMemArenas {
return ErrTooFewInMemArenas
}
c.maxInMemArenas = maxInMemArenas
return nil
}
}
// SetPeriodicFlushOps returns an Option that sets the number of
// mutate operations (enqueue/dequeue) after which the queue's in-memory
// changes will be flushed to disk. This is a best effort flush and number
// of mutate operations is checked upon an enqueue/dequeue.
//
// For durability this value should be low.
// For performance this value should be high.
func SetPeriodicFlushOps(flushMutOps int64) Option {
return func(c *bqConfig) error {
if flushMutOps < 1 {
return ErrMustBeGreaterThanZero
}
c.flushMutOps = flushMutOps
return nil
}
}
// SetPeriodicFlushDuration returns an Option that sets a periodic
// flush every given duration after which the queue's in-memory changes
// will be flushed to disk. This is a best effort flush and elapsed time is
// checked upon an enqueue/dequeue only.
//
// For durability this value should be low.
// For performance this value should be high.
func SetPeriodicFlushDuration(flushPeriod time.Duration) Option {
// TODO: in future we should do a timely flush from a background scheduled goroutine
return func(c *bqConfig) error {
if flushPeriod < 1 {
return ErrMustBeGreaterThanZero
}
c.flushPeriod = flushPeriod
return nil
}
}
func setClock(clock clockwork.Clock) Option {
return func(c *bqConfig) error {
c.clock = clock
return nil
}
}