Skip to content

Commit

Permalink
add close
Browse files Browse the repository at this point in the history
  • Loading branch information
c3mb0 committed Jul 3, 2020
1 parent 79c7e18 commit 6427456
Showing 1 changed file with 71 additions and 24 deletions.
95 changes: 71 additions & 24 deletions niftycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Cache struct {
maxExpires int
maxCallbacks int
cbLimiter chan struct{}
done chan struct{}
wg *sync.WaitGroup
closed bool
callbacks *queue.Queue
}

Expand Down Expand Up @@ -88,66 +91,107 @@ func New(ttl time.Duration, options ...Option) *Cache {
maxExpires: 10000,
maxCallbacks: 1000,
callbacks: queue.New(),
done: make(chan struct{}),
wg: new(sync.WaitGroup),
}
for _, o := range options {
o(nc)
}
nc.cbLimiter = make(chan struct{}, nc.maxCallbacks)
nc.wg.Add(2)
go nc.handleExpirations()
go nc.handleCallbacks()
return nc
}

func (nc *Cache) Close() {
close(nc.done)
nc.wg.Wait()
nc.m.Lock()
nc.items = nil
nc.ih = nil
nc.callbacks = nil
nc.closed = true
nc.m.Unlock()
}

func (nc *Cache) handleCallbacks() {
done := nc.done
leave := false
for {
nc.m.Lock()
if nc.callbacks.Length() == 0 {
select {
case <-done:
done = nil
leave = true
default:
nc.m.Lock()
if nc.callbacks.Length() == 0 {
nc.m.Unlock()
if leave {
nc.wg.Done()
return
}
time.Sleep(time.Second)
continue
}
out := nc.callbacks.Remove()
nc.m.Unlock()
time.Sleep(time.Second)
continue
cb := out.(func())
nc.cbLimiter <- struct{}{}
go func() {
cb()
<-nc.cbLimiter
}()
}
out := nc.callbacks.Remove()
nc.m.Unlock()
cb := out.(func())
nc.cbLimiter <- struct{}{}
go func() {
cb()
<-nc.cbLimiter
}()
}

}

func (nc *Cache) handleExpirations() {
for range time.Tick(time.Second) {
nc.m.Lock()
for item, j := nc.ih.peek(), 0; j < nc.maxExpires && item != nil && item.expired(); item, j = nc.ih.peek(), j+1 {
delete(nc.items, item.key)
nc.ih.pop()
if nc.expireCB != nil {
nc.callbacks.Add(nc.expireCB(item.key, item.value))
t := time.NewTicker(time.Second)
for {
select {
case <-nc.done:
t.Stop()
nc.wg.Done()
return
case <-t.C:
nc.m.Lock()
for item, j := nc.ih.peek(), 0; j < nc.maxExpires && item != nil && item.expired(); item, j = nc.ih.peek(), j+1 {
delete(nc.items, item.key)
nc.ih.pop()
if nc.expireCB != nil {
nc.callbacks.Add(nc.expireCB(item.key, item.value))
}
}
nc.m.Unlock()
}
nc.m.Unlock()
}
}

func (nc *Cache) Remove(key string) {
nc.m.Lock()
defer nc.m.Unlock()
if nc.closed {
return
}
item, ok := nc.items[key]
if !ok {
return
}
delete(nc.items, key)
nc.ih.remove(item)
if nc.removeCB != nil {
nc.callbacks.Add(nc.removeCB(item.key, item.value))
nc.callbacks.Add(nc.removeCB(key, item.value))
}
}

func (nc *Cache) Get(key string) (interface{}, bool) {
nc.m.Lock()
defer nc.m.Unlock()
if nc.closed {
return nil, false
}
item, ok := nc.items[key]
if !ok {
return nil, false
Expand All @@ -161,20 +205,23 @@ func (nc *Cache) Get(key string) (interface{}, bool) {

func (nc *Cache) Set(key string, value interface{}) {
nc.m.Lock()
defer nc.m.Unlock()
if nc.closed {
return
}
item, ok := nc.items[key]
if !ok {
item = newItem(key, value, nc.ttl)
nc.items[key] = item
nc.ih.push(item)
if nc.setCB != nil {
nc.callbacks.Add(nc.setCB(item.key, item.value))
nc.callbacks.Add(nc.setCB(key, value))
}
} else {
item.update(value)
nc.ih.update(item)
if nc.updateCB != nil {
nc.callbacks.Add(nc.updateCB(item.key, item.value))
nc.callbacks.Add(nc.updateCB(key, value))
}
}
nc.m.Unlock()
}

0 comments on commit 6427456

Please sign in to comment.