Skip to content

Commit

Permalink
Use freecache as storage to evict old items on long tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent 233b329 commit 6842d8f
Showing 1 changed file with 35 additions and 30 deletions.
65 changes: 35 additions & 30 deletions tests/e2e/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"strconv"
"sync"

"github.com/coocood/freecache"
"github.com/prometheus/client_golang/prometheus"

"github.com/bbva/qed/api/metricshttp"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/util"
)

var (
Expand Down Expand Up @@ -116,44 +118,42 @@ func (a *alertStore) GetAll() []string {
return n
}

const segmentSize uint64 = 1 << 20

type Segment [segmentSize]*protocol.SignedSnapshot

type snapStore struct {
segments []Segment
sync.Mutex
data *freecache.Cache
}

func newSnapStore() *snapStore {
return &snapStore{segments: append([]Segment{}, Segment{})}
return &snapStore{
data: freecache.NewCache(2 << 30),
}
}

func (s *snapStore) Put(b *protocol.BatchSnapshots) {
s.Lock()
defer s.Unlock()
lastVersion := b.Snapshots[len(b.Snapshots)-1].Snapshot.Version
maxSegment := lastVersion / segmentSize
for i := uint64(len(s.segments)); i <= maxSegment; i++ {
s.segments = append(s.segments, Segment{})
}
func (s *snapStore) Put(b *protocol.BatchSnapshots) error {
for _, snap := range b.Snapshots {
targetSegment := snap.Snapshot.Version / segmentSize
targetIndex := snap.Snapshot.Version - (targetSegment * segmentSize)
s.segments[targetSegment][targetIndex] = snap
key := util.Uint64AsBytes(snap.Snapshot.Version)
val, err := snap.Encode()
if err != nil {
return err
}
s.data.Set(key, val, 0)
log.Debugf("snapStore(): saved snapshot with version ", snap.Snapshot.Version)
QedStoreEventsStoredTotal.Inc()
}
return nil
}

func (s *snapStore) Get(version uint64) *protocol.SignedSnapshot {
s.Lock()
defer s.Unlock()
targetSegment := version / segmentSize
if targetSegment >= uint64(len(s.segments)) {
return nil
func (s *snapStore) Get(version uint64) (*protocol.SignedSnapshot, error) {
var snap protocol.SignedSnapshot
key := util.Uint64AsBytes(version)
val, err := s.data.Get(key)
if err != nil {
return nil, err
}
targetIndex := version - (targetSegment * segmentSize)
return s.segments[targetSegment][targetIndex]
err = snap.Decode(val)
if err != nil {
return nil, err
}
return &snap, nil
}

type Service struct {
Expand Down Expand Up @@ -249,16 +249,18 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {
var b protocol.BatchSnapshots
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Infof("test_service(POST /batch): %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = b.Decode(buf)
if err != nil {
log.Infof("test_service(POST /batch): %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(b.Snapshots) < 1 {
log.Infof("Empty batch recevied!")
log.Infof("test_service(POST /batch): Empty batch recevied!")
http.Error(w, "Empty batch recevied!", http.StatusInternalServerError)
return
}
Expand All @@ -279,9 +281,10 @@ func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b := s.snaps.Get(uint64(version))
if b == nil {
http.Error(w, fmt.Sprintf("Version not found: %v", version), http.StatusUnprocessableEntity)
b, err := s.snaps.Get(uint64(version))
if err != nil {
log.Infof("test_service(GET /snapshots?v=%d): not found becasue %v", version, err)
http.Error(w, fmt.Sprintf("Version not found: %v", version), http.StatusNotFound)
return
}
buf, err := b.Encode()
Expand All @@ -304,6 +307,7 @@ func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {
if r.Method == "GET" {
b, err := json.Marshal(s.alerts.GetAll())
if err != nil {
log.Infof("test_service(GET /alert) error decoding alerts because ", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -317,6 +321,7 @@ func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Infof("test_service(GET /alert) error reading alerts because ", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down

0 comments on commit 6842d8f

Please sign in to comment.