forked from jellydator/ttlcache
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.go
283 lines (244 loc) · 7.27 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package ttlcache
import (
"sync"
"time"
)
// CheckExpireCallback is used as a callback for an external check on item expiration
type checkExpireCallback func(key string, value interface{}) bool
// ExpireCallback is used as a callback on item expiration or when notifying of an item new to the cache
type expireCallback func(key string, value interface{})
// Cache is a synchronized map of items that can auto-expire once stale
type Cache struct {
mutex sync.Mutex
ttl time.Duration
items map[string]*item
expireCallback expireCallback
checkExpireCallback checkExpireCallback
newItemCallback expireCallback
priorityQueue *priorityQueue
expirationNotification chan bool
expirationTime time.Time
skipTTLExtension bool
shutdownSignal chan (chan struct{})
isShutDown bool
}
func (cache *Cache) getItem(key string) (*item, bool, bool) {
item, exists := cache.items[key]
if !exists || item.expired() {
return nil, false, false
}
if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) {
if cache.ttl > 0 && item.ttl == 0 {
item.ttl = cache.ttl
}
if !cache.skipTTLExtension {
item.touch()
}
cache.priorityQueue.update(item)
}
expirationNotification := false
if cache.expirationTime.After(time.Now().Add(item.ttl)) {
expirationNotification = true
}
return item, exists, expirationNotification
}
func (cache *Cache) startExpirationProcessing() {
timer := time.NewTimer(time.Hour)
for {
var sleepTime time.Duration
cache.mutex.Lock()
if cache.priorityQueue.Len() > 0 {
sleepTime = time.Until(cache.priorityQueue.items[0].expireAt)
if sleepTime < 0 && cache.priorityQueue.items[0].expireAt.IsZero() {
sleepTime = time.Hour
} else if sleepTime < 0 {
sleepTime = time.Microsecond
}
if cache.ttl > 0 {
sleepTime = min(sleepTime, cache.ttl)
}
} else if cache.ttl > 0 {
sleepTime = cache.ttl
} else {
sleepTime = time.Hour
}
cache.expirationTime = time.Now().Add(sleepTime)
cache.mutex.Unlock()
timer.Reset(sleepTime)
select {
case shutdownFeedback := <-cache.shutdownSignal:
timer.Stop()
shutdownFeedback <- struct{}{}
return
case <-timer.C:
timer.Stop()
cache.mutex.Lock()
if cache.priorityQueue.Len() == 0 {
cache.mutex.Unlock()
continue
}
// index will only be advanced if the current entry will not be evicted
i := 0
for item := cache.priorityQueue.items[i]; item.expired(); item = cache.priorityQueue.items[i] {
if cache.checkExpireCallback != nil {
if !cache.checkExpireCallback(item.key, item.data) {
item.touch()
cache.priorityQueue.update(item)
i++
if i == cache.priorityQueue.Len() {
break
}
continue
}
}
cache.priorityQueue.remove(item)
delete(cache.items, item.key)
if cache.expireCallback != nil {
go cache.expireCallback(item.key, item.data)
}
if cache.priorityQueue.Len() == 0 {
goto done
}
}
done:
cache.mutex.Unlock()
case <-cache.expirationNotification:
timer.Stop()
continue
}
}
}
// Close calls Purge, and then stops the goroutine that does ttl checking, for a clean shutdown.
// The cache is no longer cleaning up after the first call to Close, repeated calls are safe though.
func (cache *Cache) Close() {
cache.mutex.Lock()
if !cache.isShutDown {
cache.isShutDown = true
cache.mutex.Unlock()
feedback := make(chan struct{})
cache.shutdownSignal <- feedback
<-feedback
close(cache.shutdownSignal)
} else {
cache.mutex.Unlock()
}
cache.Purge()
}
// Set is a thread-safe way to add new items to the map
func (cache *Cache) Set(key string, data interface{}) {
cache.SetWithTTL(key, data, ItemExpireWithGlobalTTL)
}
// SetWithTTL is a thread-safe way to add new items to the map with individual ttl
func (cache *Cache) SetWithTTL(key string, data interface{}, ttl time.Duration) {
cache.mutex.Lock()
item, exists, _ := cache.getItem(key)
if exists {
item.data = data
item.ttl = ttl
} else {
item = newItem(key, data, ttl)
cache.items[key] = item
}
if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) {
if cache.ttl > 0 && item.ttl == 0 {
item.ttl = cache.ttl
}
item.touch()
}
if exists {
cache.priorityQueue.update(item)
} else {
cache.priorityQueue.push(item)
}
cache.mutex.Unlock()
if !exists && cache.newItemCallback != nil {
cache.newItemCallback(key, data)
}
cache.expirationNotification <- true
}
// Get is a thread-safe way to lookup items
// Every lookup, also touches the item, hence extending it's life
func (cache *Cache) Get(key string) (interface{}, bool) {
cache.mutex.Lock()
item, exists, triggerExpirationNotification := cache.getItem(key)
var dataToReturn interface{}
if exists {
dataToReturn = item.data
}
cache.mutex.Unlock()
if triggerExpirationNotification {
cache.expirationNotification <- true
}
return dataToReturn, exists
}
func (cache *Cache) Remove(key string) bool {
cache.mutex.Lock()
object, exists := cache.items[key]
if !exists {
cache.mutex.Unlock()
return false
}
delete(cache.items, object.key)
cache.priorityQueue.remove(object)
cache.mutex.Unlock()
return true
}
// Count returns the number of items in the cache
func (cache *Cache) Count() int {
cache.mutex.Lock()
length := len(cache.items)
cache.mutex.Unlock()
return length
}
func (cache *Cache) SetTTL(ttl time.Duration) {
cache.mutex.Lock()
cache.ttl = ttl
cache.mutex.Unlock()
cache.expirationNotification <- true
}
// SetExpirationCallback sets a callback that will be called when an item expires
func (cache *Cache) SetExpirationCallback(callback expireCallback) {
cache.expireCallback = callback
}
// SetCheckExpirationCallback sets a callback that will be called when an item is about to expire
// in order to allow external code to decide whether the item expires or remains for another TTL cycle
func (cache *Cache) SetCheckExpirationCallback(callback checkExpireCallback) {
cache.checkExpireCallback = callback
}
// SetNewItemCallback sets a callback that will be called when a new item is added to the cache
func (cache *Cache) SetNewItemCallback(callback expireCallback) {
cache.newItemCallback = callback
}
// SkipTtlExtensionOnHit allows the user to change the cache behaviour. When this flag is set to true it will
// no longer extend TTL of items when they are retrieved using Get, or when their expiration condition is evaluated
// using SetCheckExpirationCallback.
func (cache *Cache) SkipTtlExtensionOnHit(value bool) {
cache.skipTTLExtension = value
}
// Purge will remove all entries
func (cache *Cache) Purge() {
cache.mutex.Lock()
cache.items = make(map[string]*item)
cache.priorityQueue = newPriorityQueue()
cache.mutex.Unlock()
}
// NewCache is a helper to create instance of the Cache struct
func NewCache() *Cache {
shutdownChan := make(chan chan struct{})
cache := &Cache{
items: make(map[string]*item),
priorityQueue: newPriorityQueue(),
expirationNotification: make(chan bool),
expirationTime: time.Now(),
shutdownSignal: shutdownChan,
isShutDown: false,
}
go cache.startExpirationProcessing()
return cache
}
func min(duration time.Duration, second time.Duration) time.Duration {
if duration < second {
return duration
}
return second
}